# pr#agma pylint: disable=missing-docstring, invalid-name, pointless-string-statement # isort: skip_file # --- Do not remove these libs --- from datetime import datetime from freqtrade.persistence import Trade import numpy as np # noqa import pandas as pd # noqa from pandas import DataFrame from datetime import timezone, timedelta from datetime import timedelta, datetime from freqtrade.strategy.interface import IStrategy from freqtrade.persistence import Trade from typing import Optional, Union, Tuple from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) # -------------------------------- # Add your lib to import here import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from functools import reduce from random import shuffle timeperiods = [3, 5, 12, 24, 48, 60] score_indicators = list() stoploss_indicators = list() god_genes_with_timeperiod = list() for timeperiod in timeperiods: # god_genes_with_timeperiod.append(f'max{timeperiod}') # god_genes_with_timeperiod.append(f'min{timeperiod}') # god_genes_with_timeperiod.append(f"percent{timeperiod}") # god_genes_with_timeperiod.append(f"sma{timeperiod}") god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv1") god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv2") god_genes_with_timeperiod.append(f"sma{timeperiod}_score") # stoploss_indicators.append(f"stop_buying{timeperiod}") stoploss_indicators.append(f"stop_buying{timeperiod}_1d") score_indicators.append(f"sma{timeperiod}_score") # score_indicators.append(f"sma{timeperiod}_score_1d") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_down") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_down") print(stoploss_indicators) operators = [ "D", # Disabled gene ">", # Indicator, bigger than cross indicator "<", # Indicator, smaller than cross indicator "=", # Indicator, equal with cross indicator "C", # Indicator, crossed the cross indicator "CA", # Indicator, crossed above the cross indicator "CB", # Indicator, crossed below the cross indicator # ">R", # Normalized indicator, bigger than real number # "=R", # Normalized indicator, equal with real number # "R", # Normalized indicator devided to cross indicator, bigger than real number # "/=R", # Normalized indicator devided to cross indicator, equal with real number # "/ 10) # TODO : it ill callculated in populate indicators. pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) # print(f"{indicator} {crossed_indicator} {real_num}") dataframe[indicator] = gene_calculator(dataframe, indicator) dataframe[crossed_indicator] = gene_calculator(dataframe, crossed_indicator) indicator_trend_sma = f"{indicator}-SMA-{TREND_CHECK_CANDLES}" if operator in ["UT", "DT", "OT", "CUT", "CDT", "COT"]: dataframe[indicator_trend_sma] = gene_calculator(dataframe, indicator_trend_sma) if operator == ">": condition = (dataframe[indicator] > dataframe[crossed_indicator]) elif operator == "=": condition = (np.isclose(dataframe[indicator], dataframe[crossed_indicator])) elif operator == "<": condition = (dataframe[indicator] < dataframe[crossed_indicator]) elif operator == "C": condition = ( (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) | (qtpylib.crossed_above( dataframe[indicator], dataframe[crossed_indicator])) ) elif operator == "CA": condition = (qtpylib.crossed_above(dataframe[indicator], dataframe[crossed_indicator])) elif operator == "CB": condition = (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) elif operator == ">R": condition = (dataframe[indicator] > real_num) elif operator == "=R": condition = (np.isclose(dataframe[indicator], real_num)) elif operator == "R": condition = (dataframe[indicator].div(dataframe[crossed_indicator]) > real_num) elif operator == "/=R": condition = (np.isclose(dataframe[indicator].div(dataframe[crossed_indicator]), real_num)) elif operator == "/ dataframe[indicator_trend_sma]) elif operator == "DT": condition = (dataframe[indicator] < dataframe[indicator_trend_sma]) elif operator == "OT": condition = (np.isclose(dataframe[indicator], dataframe[indicator_trend_sma])) elif operator == "CUT": condition = ( ( qtpylib.crossed_above(dataframe[indicator],dataframe[indicator_trend_sma]) ) & ( dataframe[indicator] > dataframe[indicator_trend_sma] ) ) elif operator == "CDT": condition = ( ( qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma]) ) & ( dataframe[indicator] < dataframe[indicator_trend_sma] ) ) elif operator == "COT": condition = ( ( ( qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma]) ) | ( qtpylib.crossed_above(dataframe[indicator], dataframe[indicator_trend_sma]) ) ) & ( np.isclose(dataframe[indicator], dataframe[indicator_trend_sma]) ) ) return condition, dataframe # ######################################################################################################################### # This class is a sample. Feel free to customize it. class Empty(IStrategy): # Strategy interface version - allow new iterations of the strategy interface. # Check the documentation or the Sample strategy to get the latest version. INTERFACE_VERSION = 2 # ROI table: minimal_roi = { #"0": 0.015 "0": 0.5 } # Stoploss: stoploss = -1 trailing_stop = False # trailing_stop_positive = 0.15 # trailing_stop_positive_offset = 0.20 # trailing_only_offset_is_reached = False position_adjustment_enable = True use_custom_stoploss = False #max_open_trades = 3 # Optimal ticker interval for the strategy. timeframe = '1h' # Run "populate_indicators()" only for new candle. process_only_new_candles = False # These values can be overridden in the "ask_strategy" section in the config. use_sell_signal = True sell_profit_only = False ignore_roi_if_buy_signal = False # Number of candles the strategy requires before producing valid signals startup_candle_count: int = 30 pairs = { pair: { 'first_amount': 0, "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'last_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False, 'current_trade': None, 'last_trade': None } for pair in ["BTC/USDC", "BTC/USDT"] } plot_config = { "main_plot": { "sma5": { "color": "#7aa90b" }, "min24": { "color": "#121acd" } }, "subplots": { "Inv": { "sma5_inv": { "color": "#aef878", "type": "line" }, "zero": { "color": "#fdba52" }, "sma24_score": { "color": "#f1f5b0" } }, "drv": { "sma5_deriv1": { "color": "#96eebb" } } }, "options": { "markAreaZIndex": 1 } } # Buy Hyperoptable Parameters/Spaces. buy_crossed_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="ADD-20", space='buy') buy_crossed_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="ASIN-6", space='buy') buy_crossed_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLEVENINGSTAR-50", space='buy') buy_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="SMA-100", space='buy') buy_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="WILLR-50", space='buy') buy_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLHANGINGMAN-20", space='buy') buy_operator0 = CategoricalParameter(operators, default="/ float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() return self.adjust_stake_amount(pair, last_candle) def adjust_stake_amount(self, pair: str, last_candle: DataFrame): if (self.pairs[pair]['first_amount'] > 0): amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) else: # range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1d"] # range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1d"] # # if range_max == range_min: # return -0.1 # sécurité # # range_pos = (last_candle['close'] - range_min) / (range_max - range_min) range_pos = last_candle[f"range_pos"] sl_min = self.wallets.get_available_stake_amount() / 2 sl_max = self.wallets.get_available_stake_amount() / 6 amount = sl_min + (1 - range_pos) * (sl_max - sl_min) # amount = self.wallets.get_available_stake_amount() / 8 return min(amount, self.wallets.get_available_stake_amount()) def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 # dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() last_candle_previous_1h = dataframe.iloc[-13].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) # last_lost = self.getLastLost(last_candle, pair) # pct_first = 0 stake_amount = self.adjust_stake_amount(pair, last_candle) # if (last_candle['enter_long'] == 1 and current_profit < -0.05 and stake_amount > 10) : # # print(f"adjust {current_time} {stake_amount}") # print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") # return stake_amount if last_candle['enter_long'] != 1: return None filled_buys = [ o for o in trade.orders if o.status == "closed" and o.ft_order_side == "buy" ] if not filled_buys: return None last_buy = max(filled_buys, key=lambda o: o.order_date) last_entry_price = last_buy.price drop_from_last_entry = (current_rate - last_entry_price) / last_entry_price if drop_from_last_entry <= -0.025 and last_candle['min60'] == last_candle_3['min60']: # stake_amount = trade.stake_amount print(f"adjust {current_time} {stake_amount}") print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") return stake_amount return None def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True if self.pairs[pair]['last_trade'] and self.pairs[pair]['last_trade'].close_date: # base cooldown = n bougies / cooldown proportionnel au profit # bougies de plus par % cooldown_candles = 12 + 6 * (int(self.pairs[pair]['last_profit'] / 0.01)) # réglable # temps depuis la fermeture candles_since_close = ((current_time - self.pairs[pair]['last_trade'].close_date).total_seconds() / 3600) # seconds / heure condition = (candles_since_close >= cooldown_candles) print(f"Cool down {round(self.pairs[pair]['last_profit'], 3)} {cooldown_candles} {candles_since_close}") # self.should_enter_trade(pair, last_candle, current_time) allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') # force = self.pairs[pair]['force_buy'] # if self.pairs[pair]['force_buy']: # self.pairs[pair]['force_buy'] = False # allow_to_buy = True # else: # if not self.should_enter_trade(pair, last_candle, current_time): # allow_to_buy = False if allow_to_buy: self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_profit'] = 0 self.pairs[pair]['last_trade'] = None self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['last_date'] = current_time dispo = round(self.wallets.get_available_stake_amount()) # self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['first_amount'] = stake_amount self.pairs[pair]['total_amount'] = stake_amount print(f"Buy {pair} {current_time} {entry_tag} dispo={dispo} amount={stake_amount} rate={rate} rate={rate}") # self.log_trade( # last_candle=last_candle, # date=current_time, # action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), # pair=pair, # rate=rate, # dispo=dispo, # profit=0, # trade_type=entry_tag, # buys=1, # stake=round(stake_amount, 2) # ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() profit = trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') or (exit_reason == 'trailing_stop_loss') minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 profit = trade.calc_profit(rate) self.pairs[pair]['previous_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} rate={rate} open_rate={trade.open_rate} profit={profit}") # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟥Sell " + str(minutes), # pair=pair, # trade_type=exit_reason, # rate=last_candle['close'], # dispo=dispo, # profit=round(profit, 2) # ) self.pairs[pair]['first_amount'] = 0 self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['last_trade'] = self.pairs[pair]['current_trade'] self.pairs[pair]['current_trade'] = None else: print(f"{current_time} STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1] before_last_candle = dataframe.iloc[-2] self.pairs[pair]['current_trade'] = trade momentum = last_candle[self.sell_score_indicator.value] # Si momentum fort → on laisse courir if momentum > 1: return None # Si momentum faiblit → on prend profit plus tôt if momentum < 0 and current_profit > 0.02 and last_candle[self.sell_score_indicator.value] < before_last_candle[self.sell_score_indicator.value]\ and last_candle['close'] < last_candle['sma5']: self.pairs[pair]['last_profit'] = current_profit return "momentum_drop" return None # def custom_stoploss(self, pair, trade, current_time, current_rate, current_profit, **kwargs): # dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) # last_candle = dataframe.iloc[-1] # profit = trade.calc_profit(current_rate) # # # print(f'test stop loss {self.stoploss} {last_candle["stop_buying12_1d"]}') # if last_candle[self.stoploss_indicator.value] and (trade.nr_of_successful_entries >= 4 or self.wallets.get_available_stake_amount() < 300): # and profit < - 30 : # range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1d"] # range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1d"] # # if range_max == range_min: # print(f'ranges={range_min}') # return -0.1 # sécurité # # range_pos = (current_rate - range_min) / (range_max - range_min) # # if (range_pos > 1): # return -1 # # sl_min = -0.02 # sl_max = -0.1 #self.stoploss # # dynamic_sl = (sl_min + (1 - range_pos) * (sl_max - sl_min)) # # print(f'{current_time} Loss ranges={round(range_min,0)} {round(range_max, 0)} range_pos={round(range_pos, 3)} dynamic_sl={round(dynamic_sl, 3)} ' # f'profit={profit} current={current_profit} {self.stoploss_indicator.value} {self.stoploss_timeperiod.value} {last_candle[self.stoploss_indicator.value]}') # # return dynamic_sl # # return -1 def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] # informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe['zero'] = 0; for timeperiod in timeperiods: dataframe[f'max{timeperiod}'] = talib.MAX(dataframe['close'], timeperiod=timeperiod) dataframe[f'min{timeperiod}'] = talib.MIN(dataframe['close'], timeperiod=timeperiod) dataframe[f"percent{timeperiod}"] = dataframe['close'].pct_change(timeperiod) dataframe[f"sma{timeperiod}"] = dataframe['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(dataframe, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) dataframe[f'stop_buying{timeperiod}_deb'] = (dataframe[f'sma{timeperiod}_trend_change_down'] == 1) dataframe[f'stop_buying{timeperiod}_end'] = (dataframe[f'sma{timeperiod}_trend_change_up'] == 1) latched = np.zeros(len(dataframe), dtype=bool) for i in range(1, len(dataframe)): if dataframe[f'stop_buying{timeperiod}_deb'].iloc[i]: latched[i] = True elif dataframe[f'stop_buying{timeperiod}_end'].iloc[i]: latched[i] = False else: latched[i] = latched[i - 1] dataframe[f'stop_buying{timeperiod}'] = latched # ###################################################################################################### ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") # informative = self.populateDataframe(informative, timeframe='1d') # heikinashi = qtpylib.heikinashi(informative) # informative['haopen'] = heikinashi['open'] # informative['haclose'] = heikinashi['close'] # informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose'] informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 for timeperiod in timeperiods: informative[f'max{timeperiod}'] = talib.MAX(informative['close'], timeperiod=timeperiod) informative[f'min{timeperiod}'] = talib.MIN(informative['close'], timeperiod=timeperiod) # informative[f"range{timeperiod}"] = ((informative["close"] - informative[f'min{timeperiod}']) / (informative[f'max{timeperiod}'] - informative[f'min{timeperiod}'])) # informative[f"percent{timeperiod}"] = informative['close'].pct_change(timeperiod) informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) informative[f'stop_buying_deb{timeperiod}'] = (informative[f'sma{timeperiod}_trend_change_down'] == 1) informative[f'stop_buying_end{timeperiod}'] = (informative[f'sma{timeperiod}_trend_change_up'] == 1) latched = np.zeros(len(informative), dtype=bool) for i in range(1, len(informative)): if informative[f'stop_buying_deb{timeperiod}'].iloc[i]: latched[i] = True elif informative[f'stop_buying_end{timeperiod}'].iloc[i]: latched[i] = False else: latched[i] = latched[i - 1] informative[f'stop_buying{timeperiod}'] = latched dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) # ###################################################################################################### range_min = dataframe[f"min12_1d"] range_max = dataframe[f"max48"] dataframe[f"range_pos"] = (dataframe['close'] - range_min) / (range_max - range_min) # récupérer le dernier trade fermé trades = Trade.get_trades_proxy(pair=pair,is_open=False) if trades: last_trade = trades[-1] self.pairs[pair]['last_profit'] = last_trade.close_profit # ex: 0.12 = +12% self.pairs[pair]['last_trade'] = last_trade return dataframe def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # (dataframe['sma24_score'].shift(2) <= dataframe['zero']) # & (dataframe['sma5'].shift(1) <= dataframe['sma5']) # & (dataframe['sma5_inv'] == -1) # & (dataframe['min24'].shift(3) == dataframe['min24']) # ), # 'buy'] = 1 conditions = list() # # print(dataframe.columns) # # buy_indicator = self.buy_indicator0.value # buy_crossed_indicator = self.buy_crossed_indicator0.value # buy_operator = self.buy_operator0.value # buy_real_num = self.buy_real_num0.value # condition, dataframe = condition_generator( # dataframe, # buy_operator, # buy_indicator, # buy_crossed_indicator, # buy_real_num # ) # conditions.append(condition) # # backup # buy_indicator = self.buy_indicator1.value # buy_crossed_indicator = self.buy_crossed_indicator1.value # buy_operator = self.buy_operator1.value # buy_real_num = self.buy_real_num1.value # # condition, dataframe = condition_generator( # dataframe, # buy_operator, # buy_indicator, # buy_crossed_indicator, # buy_real_num # ) # conditions.append(condition) # # buy_indicator = self.buy_indicator2.value # buy_crossed_indicator = self.buy_crossed_indicator2.value # buy_operator = self.buy_operator2.value # buy_real_num = self.buy_real_num2.value # condition, dataframe = condition_generator( # dataframe, # buy_operator, # buy_indicator, # buy_crossed_indicator, # buy_real_num # ) # conditions.append(condition) conditions.append((dataframe[self.stop_buying_indicator.value] == False)) # conditions.append(dataframe['hapercent'] > 0) # # conditions.append(dataframe[f"range_pos"] <= 0.5) # conditions.append(dataframe[f"sma5_deriv1"] > 0) print(f"BUY indicators tested \n" f"{self.buy_indicator0.value} {self.buy_crossed_indicator0.value} {self.buy_operator0.value} {self.buy_real_num0.value} \n" f"{self.buy_indicator1.value} {self.buy_crossed_indicator1.value} {self.buy_operator1.value} {self.buy_real_num1.value} \n" f"{self.buy_indicator2.value} {self.buy_crossed_indicator2.value} {self.buy_operator2.value} {self.buy_real_num2.value} \n" ) if conditions: dataframe.loc[ reduce(lambda x, y: x & y, conditions), ['enter_long', 'enter_tag'] ] = (1, 'god') return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # conditions = list() # # TODO: Its not dry code! # sell_indicator = self.sell_indicator0.value # sell_crossed_indicator = self.sell_crossed_indicator0.value # sell_operator = self.sell_operator0.value # sell_real_num = self.sell_real_num0.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # sell_indicator = self.sell_indicator1.value # sell_crossed_indicator = self.sell_crossed_indicator1.value # sell_operator = self.sell_operator1.value # sell_real_num = self.sell_real_num1.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # sell_indicator = self.sell_indicator2.value # sell_crossed_indicator = self.sell_crossed_indicator2.value # sell_operator = self.sell_operator2.value # sell_real_num = self.sell_real_num2.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # # print(f"SELL indicators tested \n" # f"{self.sell_indicator0.value} {self.sell_crossed_indicator0.value} {self.sell_operator0.value} {self.sell_real_num0.value} \n" # f"{self.sell_indicator1.value} {self.sell_crossed_indicator1.value} {self.sell_operator1.value} {self.sell_real_num1.value} \n" # f"{self.sell_indicator2.value} {self.sell_crossed_indicator2.value} {self.sell_operator2.value} {self.sell_real_num2.value} \n" # ) # # # if conditions: # dataframe.loc[reduce(lambda x, y: x & y, conditions), ['exit_long', 'exit_tag']] = (1, 'god') return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '5m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" tendency_col = f"{name}{suffixe}_state" series = dataframe[f"{name}{suffixe}"] d1 = series.diff() d2 = d1.diff() cond_bas = (d1.rolling(3).mean() > d1.rolling(10).mean()) cond_haut = (d1.rolling(3).mean() < d1.rolling(10).mean()) dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3) dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1)) dataframe[f"{name}{suffixe}_inv"] = np.where(cond_bas, -1, np.where(cond_haut, 1, 0)) short = d1.rolling(3).mean() long = d1.rolling(10).mean() spread = short - long zscore = (spread - spread.rolling(ema_period).mean()) / spread.rolling(ema_period).std() dataframe[f"{name}{suffixe}_score"] = zscore # #################################################################### # Calcul de la pente lissée d1 = series.diff() d1_smooth = d1.rolling(5).mean() # Normalisation z = (d1_smooth - d1_smooth.rolling(ema_period).mean()) / d1_smooth.rolling(ema_period).std() dataframe[f"{name}{suffixe}_trend_up"] = ( (d1_smooth.shift(1) < 0) & (d1_smooth > 0) & (z > 1.0) ) dataframe[f"{name}{suffixe}_trend_down"] = ( (d1_smooth.shift(1) > 0) & (d1_smooth < 0) & (z < -1.0) ) momentum_short = d1.rolling(int(ema_period / 2)).mean() momentum_long = d1.rolling(ema_period * 2).mean() dataframe[f"{name}{suffixe}_trend_change_up"] = ( (momentum_short.shift(1) < momentum_long.shift(1)) & (momentum_short > momentum_long) ) dataframe[f"{name}{suffixe}_trend_change_down"] = ( (momentum_short.shift(1) > momentum_long.shift(1)) & (momentum_short < momentum_long) ) return dataframe @property def protections(self): return [ { "method": "CooldownPeriod", "stop_duration_candles": 12 }, # { # "method": "MaxDrawdown", # "lookback_period_candles": 96, # "trade_limit": 4, # "max_allowed_drawdown": 0.1, # "stop_duration_candles": 24 # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 96, # "trade_limit": 2, # "stop_duration_candles": 48, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str)