# pr#agma pylint: disable=missing-docstring, invalid-name, pointless-string-statement # isort: skip_file # --- Do not remove these libs --- from datetime import datetime from freqtrade.persistence import Trade import numpy as np # noqa import pandas as pd # noqa from pandas import DataFrame from datetime import timezone, timedelta from datetime import timedelta, datetime from freqtrade.strategy.interface import IStrategy from freqtrade.persistence import Trade from typing import Optional, Union, Tuple from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) # -------------------------------- # Add your lib to import here import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from functools import reduce from random import shuffle timeperiods = [3, 5, 12, 24, 48, 60] god_genes_with_timeperiod = list() for timeperiod in timeperiods: # god_genes_with_timeperiod.append(f'max{timeperiod}') # god_genes_with_timeperiod.append(f'min{timeperiod}') # god_genes_with_timeperiod.append(f"percent{timeperiod}") # god_genes_with_timeperiod.append(f"sma{timeperiod}") god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv1") god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv2") god_genes_with_timeperiod.append(f"sma{timeperiod}_score") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_down") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_down") operators = [ "D", # Disabled gene ">", # Indicator, bigger than cross indicator "<", # Indicator, smaller than cross indicator "=", # Indicator, equal with cross indicator "C", # Indicator, crossed the cross indicator "CA", # Indicator, crossed above the cross indicator "CB", # Indicator, crossed below the cross indicator # ">R", # Normalized indicator, bigger than real number # "=R", # Normalized indicator, equal with real number # "R", # Normalized indicator devided to cross indicator, bigger than real number # "/=R", # Normalized indicator devided to cross indicator, equal with real number # "/ 10) # TODO : it ill callculated in populate indicators. pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) # print(f"{indicator} {crossed_indicator} {real_num}") dataframe[indicator] = gene_calculator(dataframe, indicator) dataframe[crossed_indicator] = gene_calculator(dataframe, crossed_indicator) indicator_trend_sma = f"{indicator}-SMA-{TREND_CHECK_CANDLES}" if operator in ["UT", "DT", "OT", "CUT", "CDT", "COT"]: dataframe[indicator_trend_sma] = gene_calculator(dataframe, indicator_trend_sma) if operator == ">": condition = (dataframe[indicator] > dataframe[crossed_indicator]) elif operator == "=": condition = (np.isclose(dataframe[indicator], dataframe[crossed_indicator])) elif operator == "<": condition = (dataframe[indicator] < dataframe[crossed_indicator]) elif operator == "C": condition = ( (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) | (qtpylib.crossed_above( dataframe[indicator], dataframe[crossed_indicator])) ) elif operator == "CA": condition = (qtpylib.crossed_above(dataframe[indicator], dataframe[crossed_indicator])) elif operator == "CB": condition = (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) elif operator == ">R": condition = (dataframe[indicator] > real_num) elif operator == "=R": condition = (np.isclose(dataframe[indicator], real_num)) elif operator == "R": condition = (dataframe[indicator].div(dataframe[crossed_indicator]) > real_num) elif operator == "/=R": condition = (np.isclose(dataframe[indicator].div(dataframe[crossed_indicator]), real_num)) elif operator == "/ dataframe[indicator_trend_sma]) elif operator == "DT": condition = (dataframe[indicator] < dataframe[indicator_trend_sma]) elif operator == "OT": condition = (np.isclose(dataframe[indicator], dataframe[indicator_trend_sma])) elif operator == "CUT": condition = ( ( qtpylib.crossed_above(dataframe[indicator],dataframe[indicator_trend_sma]) ) & ( dataframe[indicator] > dataframe[indicator_trend_sma] ) ) elif operator == "CDT": condition = ( ( qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma]) ) & ( dataframe[indicator] < dataframe[indicator_trend_sma] ) ) elif operator == "COT": condition = ( ( ( qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma]) ) | ( qtpylib.crossed_above(dataframe[indicator], dataframe[indicator_trend_sma]) ) ) & ( np.isclose(dataframe[indicator], dataframe[indicator_trend_sma]) ) ) return condition, dataframe # ######################################################################################################################### # This class is a sample. Feel free to customize it. class Empty(IStrategy): # Strategy interface version - allow new iterations of the strategy interface. # Check the documentation or the Sample strategy to get the latest version. INTERFACE_VERSION = 2 # ROI table: minimal_roi = { #"0": 0.015 "0": 0.5 } # Stoploss: stoploss = -1 trailing_stop = True trailing_stop_positive = 0.15 trailing_stop_positive_offset = 0.20 trailing_only_offset_is_reached = True position_adjustment_enable = True use_custom_stoploss = True #max_open_trades = 3 # Optimal ticker interval for the strategy. timeframe = '1h' # Run "populate_indicators()" only for new candle. process_only_new_candles = False # These values can be overridden in the "ask_strategy" section in the config. use_sell_signal = True sell_profit_only = False ignore_roi_if_buy_signal = False # Number of candles the strategy requires before producing valid signals startup_candle_count: int = 30 pairs = { pair: { 'first_amount': 0, "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False, 'current_trade': None, 'last_trade': None } for pair in ["BTC/USDC", "BTC/USDT"] } plot_config = { "main_plot": { "sma5": { "color": "#7aa90b" }, "min24": { "color": "#121acd" } }, "subplots": { "Inv": { "sma5_inv": { "color": "#aef878", "type": "line" }, "zero": { "color": "#fdba52" }, "sma24_score": { "color": "#f1f5b0" } }, "drv": { "sma5_deriv1": { "color": "#96eebb" } } }, "options": { "markAreaZIndex": 1 } } # Buy Hyperoptable Parameters/Spaces. buy_crossed_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="ADD-20", space='buy') buy_crossed_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="ASIN-6", space='buy') buy_crossed_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLEVENINGSTAR-50", space='buy') buy_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="SMA-100", space='buy') buy_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="WILLR-50", space='buy') buy_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLHANGINGMAN-20", space='buy') buy_operator0 = CategoricalParameter(operators, default="/ float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() return self.adjust_stake_amount(pair, last_candle) def adjust_stake_amount(self, pair: str, last_candle: DataFrame): if (self.pairs[pair]['first_amount'] > 0): amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) else: amount = self.wallets.get_available_stake_amount() / 4 return min(amount, self.wallets.get_available_stake_amount()) def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 # dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() last_candle_previous_1h = dataframe.iloc[-13].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) # last_lost = self.getLastLost(last_candle, pair) # pct_first = 0 stake_amount = self.adjust_stake_amount(pair, last_candle) # if (last_candle['enter_long'] == 1 and current_profit < -0.05 and stake_amount > 10) : # # print(f"adjust {current_time} {stake_amount}") # print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") # return stake_amount if last_candle['enter_long'] != 1: return None filled_buys = [ o for o in trade.orders if o.status == "closed" and o.ft_order_side == "buy" ] if not filled_buys: return None last_buy = max(filled_buys, key=lambda o: o.order_date) last_entry_price = last_buy.price drop_from_last_entry = (current_rate - last_entry_price) / last_entry_price if drop_from_last_entry <= -0.05: # stake_amount = trade.stake_amount print(f"adjust {current_time} {stake_amount}") print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") return stake_amount return None def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) # self.should_enter_trade(pair, last_candle, current_time) allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') # force = self.pairs[pair]['force_buy'] # if self.pairs[pair]['force_buy']: # self.pairs[pair]['force_buy'] = False # allow_to_buy = True # else: # if not self.should_enter_trade(pair, last_candle, current_time): # allow_to_buy = False if allow_to_buy: self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['last_date'] = current_time dispo = round(self.wallets.get_available_stake_amount()) # self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['first_amount'] = stake_amount self.pairs[pair]['total_amount'] = stake_amount print(f"Buy {pair} {current_time} {entry_tag} dispo={dispo} amount={amount} rate={rate} rate={rate}") # self.log_trade( # last_candle=last_candle, # date=current_time, # action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), # pair=pair, # rate=rate, # dispo=dispo, # profit=0, # trade_type=entry_tag, # buys=1, # stake=round(stake_amount, 2) # ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() profit = trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 profit = trade.calc_profit(rate) self.pairs[pair]['previous_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate} profit={profit}") # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟥Sell " + str(minutes), # pair=pair, # trade_type=exit_reason, # rate=last_candle['close'], # dispo=dispo, # profit=round(profit, 2) # ) self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['last_trade'] = self.pairs[pair]['current_trade'] self.pairs[pair]['current_trade'] = None # else: # print(f"STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_row = dataframe.iloc[-1] self.pairs[pair]['current_trade'] = trade momentum = last_row["sma24_score"] # Si momentum fort → on laisse courir if momentum > 1: return None # Si momentum faiblit → on prend profit plus tôt if momentum < 0 and current_profit > 0.02: return "momentum_drop" return None def custom_stoploss(self, pair, trade, current_time, current_rate, current_profit, **kwargs): if current_profit < - 0.1 and self.wallets.get_available_stake_amount(): return -0.1 return -1 def informative_pairs(self): return [] def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe['zero'] = 0; for timeperiod in timeperiods: dataframe[f'max{timeperiod}'] = talib.MAX(dataframe['close'], timeperiod=timeperiod) dataframe[f'min{timeperiod}'] = talib.MIN(dataframe['close'], timeperiod=timeperiod) dataframe[f"percent{timeperiod}"] = dataframe['close'].pct_change(timeperiod) dataframe[f"sma{timeperiod}"] = dataframe['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(dataframe, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) # ###################################################################################################### dataframe['stop_buying_deb'] = (dataframe['sma60_trend_change_down'] == 1) dataframe['stop_buying_end'] = (dataframe['sma60_trend_change_up'] == 1) latched = np.zeros(len(dataframe), dtype=bool) for i in range(1, len(dataframe)): if dataframe['stop_buying_deb'].iloc[i]: latched[i] = True elif dataframe['stop_buying_end'].iloc[i]: latched[i] = False else: latched[i] = latched[i - 1] dataframe['stop_buying'] = latched return dataframe def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # (dataframe['sma24_score'].shift(2) <= dataframe['zero']) # & (dataframe['sma5'].shift(1) <= dataframe['sma5']) # & (dataframe['sma5_inv'] == -1) # & (dataframe['min24'].shift(3) == dataframe['min24']) # ), # 'buy'] = 1 conditions = list() # print(dataframe.columns) buy_indicator = self.buy_indicator0.value buy_crossed_indicator = self.buy_crossed_indicator0.value buy_operator = self.buy_operator0.value buy_real_num = self.buy_real_num0.value condition, dataframe = condition_generator( dataframe, buy_operator, buy_indicator, buy_crossed_indicator, buy_real_num ) conditions.append(condition) # backup buy_indicator = self.buy_indicator1.value buy_crossed_indicator = self.buy_crossed_indicator1.value buy_operator = self.buy_operator1.value buy_real_num = self.buy_real_num1.value condition, dataframe = condition_generator( dataframe, buy_operator, buy_indicator, buy_crossed_indicator, buy_real_num ) conditions.append(condition) buy_indicator = self.buy_indicator2.value buy_crossed_indicator = self.buy_crossed_indicator2.value buy_operator = self.buy_operator2.value buy_real_num = self.buy_real_num2.value condition, dataframe = condition_generator( dataframe, buy_operator, buy_indicator, buy_crossed_indicator, buy_real_num ) conditions.append(condition) conditions.append((dataframe['stop_buying'] == False)) # conditions.append((dataframe['min60'].shift(5) == dataframe['min60'])) # conditions.append((dataframe['low'] < dataframe['min60'])) print(f"BUY indicators tested \n" f"{self.buy_indicator0.value} {self.buy_crossed_indicator0.value} {self.buy_operator0.value} {self.buy_real_num0.value} \n" f"{self.buy_indicator1.value} {self.buy_crossed_indicator1.value} {self.buy_operator1.value} {self.buy_real_num1.value} \n" f"{self.buy_indicator2.value} {self.buy_crossed_indicator2.value} {self.buy_operator2.value} {self.buy_real_num2.value} \n" ) if conditions: dataframe.loc[ reduce(lambda x, y: x & y, conditions), ['enter_long', 'enter_tag'] ] = (1, 'god') return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: conditions = list() # TODO: Its not dry code! sell_indicator = self.sell_indicator0.value sell_crossed_indicator = self.sell_crossed_indicator0.value sell_operator = self.sell_operator0.value sell_real_num = self.sell_real_num0.value condition, dataframe = condition_generator( dataframe, sell_operator, sell_indicator, sell_crossed_indicator, sell_real_num ) conditions.append(condition) sell_indicator = self.sell_indicator1.value sell_crossed_indicator = self.sell_crossed_indicator1.value sell_operator = self.sell_operator1.value sell_real_num = self.sell_real_num1.value condition, dataframe = condition_generator( dataframe, sell_operator, sell_indicator, sell_crossed_indicator, sell_real_num ) conditions.append(condition) sell_indicator = self.sell_indicator2.value sell_crossed_indicator = self.sell_crossed_indicator2.value sell_operator = self.sell_operator2.value sell_real_num = self.sell_real_num2.value condition, dataframe = condition_generator( dataframe, sell_operator, sell_indicator, sell_crossed_indicator, sell_real_num ) conditions.append(condition) print(f"SELL indicators tested \n" f"{self.sell_indicator0.value} {self.sell_crossed_indicator0.value} {self.sell_operator0.value} {self.sell_real_num0.value} \n" f"{self.sell_indicator1.value} {self.sell_crossed_indicator1.value} {self.sell_operator1.value} {self.sell_real_num1.value} \n" f"{self.sell_indicator2.value} {self.sell_crossed_indicator2.value} {self.sell_operator2.value} {self.sell_real_num2.value} \n" ) if conditions: dataframe.loc[reduce(lambda x, y: x & y, conditions), ['exit_long', 'exit_tag']] = (1, 'god') return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '5m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" tendency_col = f"{name}{suffixe}_state" series = dataframe[f"{name}{suffixe}"] d1 = series.diff() d2 = d1.diff() cond_bas = (d1.rolling(3).mean() > d1.rolling(10).mean()) cond_haut = (d1.rolling(3).mean() < d1.rolling(10).mean()) dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3) dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1)) dataframe[f"{name}{suffixe}_inv"] = np.where(cond_bas, -1, np.where(cond_haut, 1, 0)) short = d1.rolling(3).mean() long = d1.rolling(10).mean() spread = short - long zscore = (spread - spread.rolling(ema_period).mean()) / spread.rolling(ema_period).std() dataframe[f"{name}{suffixe}_score"] = zscore # #################################################################### # Calcul de la pente lissée d1 = series.diff() d1_smooth = d1.rolling(5).mean() # Normalisation z = (d1_smooth - d1_smooth.rolling(ema_period).mean()) / d1_smooth.rolling(ema_period).std() dataframe[f"{name}{suffixe}_trend_up"] = ( (d1_smooth.shift(1) < 0) & (d1_smooth > 0) & (z > 1.0) ) dataframe[f"{name}{suffixe}_trend_down"] = ( (d1_smooth.shift(1) > 0) & (d1_smooth < 0) & (z < -1.0) ) momentum_short = d1.rolling(int(ema_period / 2)).mean() momentum_long = d1.rolling(ema_period * 2).mean() dataframe[f"{name}{suffixe}_trend_change_up"] = ( (momentum_short.shift(1) < momentum_long.shift(1)) & (momentum_short > momentum_long) ) dataframe[f"{name}{suffixe}_trend_change_down"] = ( (momentum_short.shift(1) > momentum_long.shift(1)) & (momentum_short < momentum_long) ) return dataframe