Files
Freqtrade/Zeus_8_3_3_2.py
Jérôme Delacotte 7c239227d8 first commit
2025-03-06 11:01:43 +01:00

442 lines
21 KiB
Python

# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from typing import Optional
from freqtrade import data
from freqtrade.persistence import Trade
from freqtrade.strategy.parameters import CategoricalParameter, DecimalParameter, IntParameter, BooleanParameter
from numpy.lib import math
from freqtrade.strategy.interface import IStrategy
import pandas
from pandas import DataFrame
import time
import logging
import calendar
from freqtrade.loggers import setup_logging
from freqtrade.strategy.strategy_helper import merge_informative_pair
# --------------------------------
# Add your lib to import here
import ta
from functools import reduce
import numpy as np
import talib.abstract as talib
from freqtrade.strategy.strategy_helper import merge_informative_pair
import freqtrade.vendor.qtpylib.indicators as qtpylib
from random import shuffle
logger = logging.getLogger(__name__)
class Zeus_8_3_3_2(IStrategy):
# ROI table:
minimal_roi = {
"0": 10
# 0.564,
# "567": 0.273,
# "2814": 0.12,
# "7675": 0
}
# Stoploss:
stoploss = -1 #0.256
# Buy hypers
timeframe = '5m'
stop_buying = {}
# DCA config
position_adjustment_enable = True
plot_config = {
"main_plot": {
"min200": {
"color": "#86c932"
},
"min50": {
"color": "white"
},
# "max200": {
# "color": "yellow"
# },
"sma3_1d": {
"color": "pink"
},
"sma5_1d": {
"color": "blue"
},
"sma10_1d": {
"color": "orange"
},
"close_1d": {
"color": "#73e233",
},
"low": {
"color": "cyan",
},
"bb_lowerband": {
"color": "#da59a6"},
"bb_upperband": {
"color": "#da59a6",
}
},
"subplots": {
# "Ind": {
# "trend_ichimoku_base": {
# "color": "#dd1384"
# },
# "trend_kst_diff": {
# "color": "#850678"
# }
# },
# "BB": {
# "bb_width": {
# "color": "white"
# },
# "bb_lower_5": {
# "color": "yellow"
# }
# },
"Rsi": {
"rsi_1d": {
"color": "pink"
},
# "rsi_1h": {
# "color": "green"
# },
"rsi5": {
"color": "yellow"
},
"rsi3_1d": {
"color": "red"
}
},
# "Percent": {
# "pct_change_1_1d": {
# "color": "green"
# },
# "pct_change_3_1d": {
# "color": "orange"
# },
# "pct_change_5_1d": {
# "color": "red"
# }
# }
}
}
trades = list()
buy_min_horizon = IntParameter(50, 800, default=72, space='buy')
buy_rsi = IntParameter(1, 30, default=12, space='buy')
buy_min_max_n = DecimalParameter(0, 0.2, decimals=2, default=0.05, space='buy')
# adx_1d_limit = IntParameter(15, 45, default=18, space='buy')
sell_b_RSI = IntParameter(70, 98, default=60, space='sell')
sell_profit_percent = DecimalParameter(0.1, 1.5, decimals=1, default=0.8, space='sell')
sell_percent = DecimalParameter(0.01, 0.30, decimals=2, default=0.05, space='sell')
protection_percent_buy_lost = IntParameter(1, 30, default=3, space='protection')
protection_nb_buy_lost = IntParameter(1, 3, default=3, space='protection')
protection_stop_buying_rsi_1d = IntParameter(50, 100, default=76, space='protection')
protection_start_buying_rsi_1d = IntParameter(1, 50, default=30, space='protection')
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
allow_to_buy = True
# info_previous_last_candle = informative.iloc[-2].squeeze()
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
previous_last_candle = dataframe.iloc[-2].squeeze()
previous_previous_last_candle = dataframe.iloc[-3].squeeze()
if self.stop_buying.get(pair, None) is None:
print("enable buying tag", pair)
self.stop_buying[pair] = False
if ((last_candle['rsi5'] >= self.protection_stop_buying_rsi_1d.value) | (last_candle['close'] >= last_candle['bb_upperband'])) \
& (self.stop_buying[pair] is False):
logger.info("1 - Disable buying %s date %s", pair, last_candle['date'])
self.stop_buying[pair] = True
if self.stop_buying[pair] is True:
if ((last_candle['rsi5'] <= self.protection_start_buying_rsi_1d.value) & (last_candle['percent5'] >= 0.005)):
logger.info("2 - Enable buying %s date %s", pair, last_candle['date'])
self.stop_buying[pair] = False
logger.info("Buy ==> %s ", pair + " " + str(current_time) + "---------------------")
if self.stop_buying[pair]:
allow_to_buy = False
logger.info("3 - cancel buying %s date %s", pair, str(last_candle['date']) + " " + str(last_candle['rsi5']))
else:
logger.info("3 - accept buying %s date %s", pair, str(last_candle['date']) + " " + str(last_candle['rsi5']))
return allow_to_buy
def custom_sell(self, pair: str, trade: 'Trade', current_time: 'datetime', current_rate: float,
current_profit: float, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
previous_last_candle = dataframe.iloc[-2].squeeze()
previous_previous_last_candle = dataframe.iloc[-3].squeeze()
if (current_profit > self.buy_min_max_n.value * self.sell_profit_percent.value) \
& (previous_last_candle['rsi5'] > self.sell_b_RSI.value) \
& (previous_last_candle['rsi5'] >= last_candle['rsi5']) \
& (previous_last_candle['rsi5'] >= previous_previous_last_candle['rsi5']):
logger.info("Sell ==> %s ", pair + " " + str(current_time) + str(current_profit) + " " + str(current_rate))
return 'profit_1' # + str(self.sell_percent.value)
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
# informative_pairs += [(pair, '4h') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
dataframe['pct_change'] = dataframe['close'].pct_change(5)
dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=self.buy_min_horizon.value)
# dataframe['min10'] = talib.MIN(dataframe['close'], timeperiod=10)
# dataframe['min20'] = talib.MIN(dataframe['close'], timeperiod=20)
# dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50)
dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200)
# dataframe['min200_1'] = dataframe['min200'] * 1.005
# dataframe['moy200_12'] = dataframe['min200'].rolling(12).mean()
dataframe['max50'] = talib.MAX(dataframe['close'], timeperiod=50)
dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200)
dataframe['min_max200'] = (dataframe['max200'] - dataframe['min200']) / dataframe['min200']
dataframe['min_n'] = talib.MIN(dataframe['close'], timeperiod=self.buy_min_horizon.value)
dataframe['max_n'] = talib.MAX(dataframe['close'], timeperiod=self.buy_min_horizon.value)
dataframe['min_max_n'] = (dataframe['max_n'] - dataframe['min_n']) / dataframe['min_n']
dataframe['rsi'] = talib.RSI(dataframe)
dataframe['rsi5'] = talib.RSI(dataframe, timeperiod=5)
dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5)
# dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10)
# dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20)
# dataframe['sma50'] = talib.SMA(dataframe, timeperiod=50)
# dataframe['sma100'] = talib.SMA(dataframe, timeperiod=100)
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
dataframe["percent5"] = dataframe["percent"].rolling(5).sum()
# dataframe["percent3"] = dataframe["percent"].rolling(3).sum()
# dataframe["percent10"] = dataframe["percent"].rolling(10).sum()
# dataframe["percent20"] = dataframe["percent"].rolling(20).sum()
# dataframe["percent50"] = dataframe["percent"].rolling(50).sum()
# dataframe['percent_lost_n'] = dataframe["percent"].rolling(self.protection_lost_candles.value).sum()
# dataframe["volume10"] = dataframe["volume"].rolling(10).mean()
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = (
(dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
)
# dataframe['bb_lower_var_5'] = (dataframe['bb_lowerband'] - dataframe['min50']).rolling(5).var()
# dataframe['bb_lower_5'] = 100 * ((dataframe['bb_lowerband'].rolling(5).mean() / dataframe['bb_lowerband']) - 1)
# dataframe['bb_lower_width_5'] = (dataframe['bb_lowerband'] * (1 + dataframe['bb_width'] / self.buy_bb_width_n.value))
dataframe['distance_min'] = (dataframe['close'] - dataframe['min']) / dataframe['close']
dataframe['min1.1'] = 1.01 * dataframe['min']
dataframe['normal'] = 100 * (dataframe['close'] / dataframe['close'].rolling(200).mean())
dataframe['min_max_close'] = (
(dataframe['max200'] - dataframe['close']) / (dataframe['close'] - dataframe['min200']))
# dataframe['stop_buying'] = (dataframe['rsi5'] >= self.protection_stop_buying_rsi_1d.value) \
# & (dataframe['close'] >= dataframe['bb_upperband'])
################### INFORMATIVE 1D
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
informative["rsi"] = talib.RSI(informative)
informative["rsi3"] = talib.RSI(informative, 3)
# informative["mrsi3"] = informative["rsi"].rolling(3).mean()
# informative["max3"] = talib.MAX(informative['close'], timeperiod=3)
# informative["min3"] = talib.MIN(informative['close'], timeperiod=3)
# informative['pct_change_1'] = informative['close'].pct_change(1)
# informative['pct_change_3'] = informative['close'].pct_change(3)
# informative['pct_change_5'] = informative['close'].pct_change(5)
informative['sma3'] = talib.SMA(informative, timeperiod=3)
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma10'] = talib.SMA(informative, timeperiod=10)
# informative['adx'] = talib.ADX(informative)
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(informative), window=20, stds=2)
informative['bb_lowerband'] = bollinger['lower']
informative['bb_middleband'] = bollinger['mid']
informative['bb_upperband'] = bollinger['upper']
informative["bb_percent"] = (
(informative["close"] - informative["bb_lowerband"]) /
(informative["bb_upperband"] - informative["bb_lowerband"])
)
# informative["bb_width"] = (
# (informative["bb_upperband"] - informative["bb_lowerband"]) / informative["bb_middleband"]
# )
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
# ######################## INFORMATIVE 4h
# informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="4h")
# informative["rsi"] = talib.RSI(informative)
# informative['pct_change_1'] = informative['close'].pct_change(1)
# informative['pct_change_3'] = informative['close'].pct_change(3)
# informative['pct_change_5'] = informative['close'].pct_change(5)
# informative['sma3'] = talib.SMA(informative, timeperiod=3)
# informative['sma5'] = talib.SMA(informative, timeperiod=5)
# informative['sma10'] = talib.SMA(informative, timeperiod=10)
# dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "4h", ffill=True)
######################## INFORMATIVE 1h
# informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
# informative["rsi"] = talib.RSI(informative)
# informative["rsi3"] = talib.RSI(informative, 3)
# informative["mrsi3"] = informative["rsi"].rolling(3).mean()
# informative['pct_change_1'] = informative['close'].pct_change(1)
# informative['pct_change_3'] = informative['close'].pct_change(3)
# informative['pct_change_5'] = informative['close'].pct_change(5)
# informative['sma3'] = talib.SMA(informative, timeperiod=3)
# informative['sma5'] = talib.SMA(informative, timeperiod=5)
# informative['sma10'] = talib.SMA(informative, timeperiod=10)
# informative['adx'] = talib.ADX(informative)
# bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(informative), window=20, stds=2)
# informative['bb_lowerband'] = bollinger['lower']
# informative['bb_middleband'] = bollinger['mid']
# informative['bb_upperband'] = bollinger['upper']
#
# dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[
(
(dataframe['rsi5'] < self.protection_stop_buying_rsi_1d.value)
& (dataframe['rsi5'] > self.protection_start_buying_rsi_1d.value)
& (dataframe['rsi5'].shift(1) < self.buy_rsi.value)
& (dataframe['rsi5'].shift(1) < dataframe['rsi5'])
& (dataframe['rsi5'].shift(1) < dataframe['rsi5'].shift(2))
# & (dataframe['close'] < dataframe['min_n'] * self.buy_min_max_coef.value)
& (dataframe['min_n'].shift(6) == dataframe['min_n'])
# & (dataframe['pct_change_1_1d'] > 0)
& (dataframe['min_max_n'] >= self.buy_min_max_n.value)
# & (dataframe['close'] <= dataframe['close_1d'])
# & (dataframe['close_1d'] <= dataframe['sma3_1d'])
# & (dataframe['close_1d'] <= dataframe['sma5_1d'])
# & (dataframe['close_1d'] <= dataframe['sma10_1d'])
# & (dataframe['adx_1d'] > self.adx_1d_limit.value)
# & (dataframe['rsi3_1d'].shift(288) <= dataframe['rsi3_1d'])
), ['buy', 'buy_tag']] = (1, 'buy_adx_inf')
return dataframe
# def bot_loop_start(self, **kwargs) -> None:
# pairs = self.dp.current_whitelist()
# print("Calcul des pairs informatives")
# for pairname in pairs:
# self.stop_buying[pairname] = True
# print("Fin Calcul des pairs informatives")
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# (dataframe['rsi5'].shift(1) > self.sell_b_RSI.value)
# & (dataframe['rsi5'].shift(1) >= dataframe['rsi5'])
# & (dataframe['rsi5'].shift(1) >= dataframe['rsi5'].shift(2))
# & (dataframe['close'] > dataframe['close_1d'])
# & (dataframe['close_1d'] > dataframe['sma3_1d'])
# & (dataframe['close_1d'] > dataframe['sma5_1d'])
# & (dataframe['close_1d'] > dataframe['sma10_1d'])
# # & (dataframe['rsi3_1d'].shift(288) > dataframe['rsi3_1d'])
# # (dataframe['close_1d'] > dataframe['sma3_1d'])
# # & (dataframe['rsi3_1d'] > 72)
# #& (dataframe['last_buy_price'] < (dataframe['close']))
# #& (dataframe['should_sell'] == True)
# ), ['sell', 'sell_tag']] = (1, 'sell_close_1d')
# print("dans sell" + metadata['pair'])
return dataframe
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
if (len(dataframe) < 1):
return None
if (self.stop_buying.get(trade.pair, None) == None):
# logger.info("----------- %s ", trade.pair + " Init stop buying " + str(current_profit) + " " + str(current_time) + "---------------------")
self.stop_buying[trade.pair] = False
if (self.stop_buying[trade.pair] == True):
# logger.info("----------- %s ", trade.pair + " Canceled " + str(current_profit) + " " + str(current_time) + "---------------------")
return None
last_candle = dataframe.iloc[-1].squeeze()
previous_last_candle = dataframe.iloc[-2].squeeze()
previous_previous_last_candle = dataframe.iloc[-3].squeeze()
last_candle_6 = dataframe.iloc[-7].squeeze()
# last_candle_5 = dataframe.iloc[-3].squeeze()
# last_candle_decalage = dataframe.iloc[- self.buy_min_max_decalage.value].squeeze()
# print(last_candle['buy'])
condition = (
(previous_last_candle['rsi5'] < self.buy_rsi.value)
& (previous_last_candle['rsi5'] < last_candle['rsi5'])
& (previous_last_candle['rsi5'] < previous_previous_last_candle['rsi5'])
& (last_candle_6['min_n'] == last_candle['min_n'])
& (last_candle['min_max_n'] >= self.buy_min_max_n.value)
# & (last_candle['close'] < last_candle['min_n'] * self.buy_min_max_coef.value)
# & (last_candle['close'] <= last_candle['close_1d'])
# & (last_candle['close_1d'] <= last_candle['sma3_1d'])
# & (last_candle['close_1d'] <= last_candle['sma5_1d'])
# & (last_candle['close_1d'] <= last_candle['sma10_1d'])
# & (dataframe['close'] < dataframe['min_n'] * self.buy_min_max_coef.value)
)
if not (condition):
return None
# min_d = min(last_candle['sma3_4h'], last_candle['close_1d'])
filled_buys = trade.select_filled_orders('buy')
count_of_buys = len(filled_buys)
# days = (current_time - trade.open_date_utc).days
# minutes = (current_time - trade.open_date_utc).seconds / 60
# condition = condition & ((last_candle['min50'] == last_candle_5['min50']) & (last_candle['close'] <= last_candle['close_1h']))
p = self.protection_percent_buy_lost.value
percents = [p, p * 2, p * 3, p * 4, p * 5, p * 6, p * 7, p * 8, p * 9]
if (0 < count_of_buys <= self.protection_nb_buy_lost.value) \
& (current_profit < - (percents[count_of_buys - 1] / 100)) & (condition):
try:
p = self.config['stake_amount']
factors = [p, p, p, p, 2 * p, 4 * p, 5 * p, 6 * p]
stake_amount = factors[count_of_buys - 1]# filled_buys[0].cost
# This then calculates current safety order size
# logger.info("----------- %s ", trade.pair + " " + str(current_profit) + " " + str(count_of_buys) + " " + str(stake_amount) +
# " " + str(current_time) + "---------------------")
# print("-----------" + trade.pair + " " + str(current_profit) + " " + str(count_of_buys) + " " + str(stake_amount) +
# " " + str(current_time) + "---------------------")
return stake_amount
except Exception as exception:
print(exception)
return None
return None