# pr#agma pylint: disable=missing-docstring, invalid-name, pointless-string-statement # isort: skip_file # --- Do not remove these libs --- from datetime import datetime from freqtrade.persistence import Trade import numpy as np # noqa import pandas as pd # noqa from pandas import DataFrame from datetime import timezone, timedelta from datetime import timedelta, datetime from freqtrade.strategy.interface import IStrategy from freqtrade.persistence import Trade from typing import Optional, Union, Tuple from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) # -------------------------------- # Add your lib to import here import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from functools import reduce from random import shuffle import logging logger = logging.getLogger(__name__) timeperiods = [3, 5, 12, 24, 36, 48, 60] # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" timeperiods = [3, 5, 12, 24, 36, 48, 60] score_indicators = list() stop_buying_indicators = list() god_genes_with_timeperiod = list() for timeperiod in timeperiods: # god_genes_with_timeperiod.append(f'max{timeperiod}') # god_genes_with_timeperiod.append(f'min{timeperiod}') # god_genes_with_timeperiod.append(f"percent{timeperiod}") # god_genes_with_timeperiod.append(f"sma{timeperiod}") god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv1") god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv2") god_genes_with_timeperiod.append(f"sma{timeperiod}_score") # stoploss_indicators.append(f"stop_buying{timeperiod}") stop_buying_indicators.append(f"stop_buying{timeperiod}_1d") score_indicators.append(f"sma{timeperiod}_score") # score_indicators.append(f"sma{timeperiod}_score_1d") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_down") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_down") operators = [ "D", # Disabled gene ">", # Indicator, bigger than cross indicator "<", # Indicator, smaller than cross indicator "=", # Indicator, equal with cross indicator "C", # Indicator, crossed the cross indicator "CA", # Indicator, crossed above the cross indicator "CB", # Indicator, crossed below the cross indicator # ">R", # Normalized indicator, bigger than real number # "=R", # Normalized indicator, equal with real number # "R", # Normalized indicator devided to cross indicator, bigger than real number # "/=R", # Normalized indicator devided to cross indicator, equal with real number # "/ 10) # TODO : it ill callculated in populate indicators. pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) # print(f"{indicator} {crossed_indicator} {real_num}") dataframe[indicator] = gene_calculator(dataframe, indicator) dataframe[crossed_indicator] = gene_calculator(dataframe, crossed_indicator) indicator_trend_sma = f"{indicator}-SMA-{TREND_CHECK_CANDLES}" if operator in ["UT", "DT", "OT", "CUT", "CDT", "COT"]: dataframe[indicator_trend_sma] = gene_calculator(dataframe, indicator_trend_sma) if operator == ">": condition = (dataframe[indicator] > dataframe[crossed_indicator]) elif operator == "=": condition = (np.isclose(dataframe[indicator], dataframe[crossed_indicator])) elif operator == "<": condition = (dataframe[indicator] < dataframe[crossed_indicator]) elif operator == "C": condition = ( (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) | (qtpylib.crossed_above( dataframe[indicator], dataframe[crossed_indicator])) ) elif operator == "CA": condition = (qtpylib.crossed_above(dataframe[indicator], dataframe[crossed_indicator])) elif operator == "CB": condition = (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) elif operator == ">R": condition = (dataframe[indicator] > real_num) elif operator == "=R": condition = (np.isclose(dataframe[indicator], real_num)) elif operator == "R": condition = (dataframe[indicator].div(dataframe[crossed_indicator]) > real_num) elif operator == "/=R": condition = (np.isclose(dataframe[indicator].div(dataframe[crossed_indicator]), real_num)) elif operator == "/ dataframe[indicator_trend_sma]) elif operator == "DT": condition = (dataframe[indicator] < dataframe[indicator_trend_sma]) elif operator == "OT": condition = (np.isclose(dataframe[indicator], dataframe[indicator_trend_sma])) elif operator == "CUT": condition = ( ( qtpylib.crossed_above(dataframe[indicator],dataframe[indicator_trend_sma]) ) & ( dataframe[indicator] > dataframe[indicator_trend_sma] ) ) elif operator == "CDT": condition = ( ( qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma]) ) & ( dataframe[indicator] < dataframe[indicator_trend_sma] ) ) elif operator == "COT": condition = ( ( ( qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma]) ) | ( qtpylib.crossed_above(dataframe[indicator], dataframe[indicator_trend_sma]) ) ) & ( np.isclose(dataframe[indicator], dataframe[indicator_trend_sma]) ) ) return condition, dataframe # ######################################################################################################################### # This class is a sample. Feel free to customize it. class Empty(IStrategy): # Strategy interface version - allow new iterations of the strategy interface. # Check the documentation or the Sample strategy to get the latest version. INTERFACE_VERSION = 2 # ROI table: minimal_roi = { #"0": 0.015 "0": 0.5 } # Stoploss: stoploss = -1 trailing_stop = False # trailing_stop_positive = 0.15 # trailing_stop_positive_offset = 0.20 # trailing_only_offset_is_reached = False position_adjustment_enable = True use_custom_stoploss = True #max_open_trades = 3 # Optimal ticker interval for the strategy. timeframe = '1h' # Run "populate_indicators()" only for new candle. process_only_new_candles = False # These values can be overridden in the "ask_strategy" section in the config. use_sell_signal = True sell_profit_only = False ignore_roi_if_buy_signal = False # Number of candles the strategy requires before producing valid signals startup_candle_count: int = 30 columns_logged = False pairs = { pair: { 'first_amount': 0, "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'last_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False, 'current_trade': None, 'last_trade': None, 'force_stop': False } for pair in ["BTC/USDC", "BTC/USDT"] } plot_config = { "main_plot": { "sma5": { "color": "#7aa90b" }, "min24": { "color": "#121acd" } }, "subplots": { "Inv": { "sma5_inv": { "color": "#aef878", "type": "line" }, "zero": { "color": "#fdba52" }, "sma24_score": { "color": "#f1f5b0" } }, "drv": { "sma5_deriv1": { "color": "#96eebb" } } }, "options": { "markAreaZIndex": 1 } } buy_deriv_sma60 = DecimalParameter(-0.005, 0.005, decimals=3, default=0, space='buy') buy_deriv_sma5d = DecimalParameter(-0.07, 0.07, decimals=2, default=0, space='buy') buy_deriv_sma12d = DecimalParameter(-0.07, 0.07, decimals=2, default=0, space='buy') # Buy Hyperoptable Parameters/Spaces. # buy_crossed_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="ADD-20", space='buy') # buy_crossed_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="ASIN-6", space='buy') # buy_crossed_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLEVENINGSTAR-50", space='buy') # # buy_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="SMA-100", space='buy') # buy_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="WILLR-50", space='buy') # buy_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLHANGINGMAN-20", space='buy') # # buy_operator0 = CategoricalParameter(operators, default="/ float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() return self.adjust_stake_amount(pair, last_candle) def adjust_stake_amount(self, pair: str, last_candle: DataFrame): if (self.pairs[pair]['first_amount'] > 0): amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) else: # range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1d"] # range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1d"] # # if range_max == range_min: # return -0.1 # sécurité # # range_pos = (last_candle['close'] - range_min) / (range_max - range_min) range_pos = last_candle[f"range_pos"] sl_min = self.wallets.get_available_stake_amount() / 6 sl_max = self.wallets.get_available_stake_amount() / 2 amount = sl_min + (1 - range_pos) * (sl_max - sl_min) # amount = self.wallets.get_available_stake_amount() / 8 return min(amount, self.wallets.get_available_stake_amount()) def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 # dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() last_candle_previous_1h = dataframe.iloc[-13].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) # last_lost = self.getLastLost(last_candle, pair) # pct_first = 0 stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle)) # if (last_candle['enter_long'] == 1 and current_profit < -0.05 and stake_amount > 10) : # # print(f"adjust {current_time} {stake_amount}") # print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") # return stake_amount if last_candle['enter_long'] != 1: return None filled_buys = [ o for o in trade.orders if o.status == "closed" and o.ft_order_side == "buy" ] if not filled_buys: return None last_buy = max(filled_buys, key=lambda o: o.order_date) last_entry_price = last_buy.price if last_entry_price: drop_from_last_entry = (current_rate - last_entry_price) / last_entry_price if drop_from_last_entry <= self.drop_from_last_entry.value and last_candle['min60'] == last_candle_3['min60'] \ and last_candle['range_pos'] < 0.03 and last_candle['hapercent'] > 0: # stake_amount = trade.stake_amount self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['total_amount'] += stake_amount print(f"adjust {pair} drop={drop_from_last_entry} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") # print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=round(dispo,0), pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True if condition and last_candle['range_pos'] > 0.05:# and self.pairs[pair]['force_stop']: condition = last_candle['sma5'] > last_candle['sma60'] # if self.pairs[pair]['force_stop'] and last_candle['range_pos'] < 0.02: # self.pairs[pair]['force_stop'] = False if False and self.pairs[pair]['last_trade'].close_date is not None: # base cooldown = n bougies / cooldown proportionnel au profit # bougies de plus par % cooldown_candles = 12 + 6 * (int(self.pairs[pair]['last_profit'] / 0.01)) # réglable # temps depuis la fermeture candles_since_close = ((current_time - self.pairs[pair]['last_trade'].close_date).total_seconds() / 3600) # seconds / heure condition = (candles_since_close >= cooldown_candles) print(f"Cool close date = trade={self.pairs[pair]['last_trade'].close_date} down {round(self.pairs[pair]['last_profit'], 3)} {cooldown_candles} {candles_since_close}") # self.should_enter_trade(pair, last_candle, current_time) allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') # force = self.pairs[pair]['force_buy'] # if self.pairs[pair]['force_buy']: # self.pairs[pair]['force_buy'] = False # allow_to_buy = True # else: # if not self.should_enter_trade(pair, last_candle, current_time): # allow_to_buy = False if allow_to_buy: self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_profit'] = 0 self.pairs[pair]['last_trade'] = None self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['last_date'] = current_time dispo = round(self.wallets.get_available_stake_amount()) # self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['first_amount'] = stake_amount self.pairs[pair]['total_amount'] = stake_amount # print(f"Buy {pair} {current_time} {entry_tag} dispo={dispo} amount={stake_amount} rate={rate} rate={rate}") self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() profit = trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'god') or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') or (exit_reason == 'trailing_stop_loss') minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 profit = trade.calc_profit(rate) self.pairs[pair]['previous_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} rate={rate} open_rate={trade.open_rate} profit={profit}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['first_amount'] = 0 self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['last_trade'] = trade self.pairs[pair]['current_trade'] = None else: print(f"{current_time} STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1] before_last_candle = dataframe.iloc[-2] self.pairs[pair]['current_trade'] = trade momentum = last_candle[self.sell_score_indicator.value] profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) count_of_buys = trade.nr_of_successful_entries expected_profit = self.expectedProfit(pair, last_candle) self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) max_profit = self.pairs[pair]['max_profit'] baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) + self.pairs[pair]['total_amount'] self.log_trade( last_candle=last_candle, date=current_time, action=("🔴 NOW" if self.pairs[pair]['stop'] else "🟢 NOW "), dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='momentum' + str(round(momentum, 2)), profit=round(profit, 2), buys=count_of_buys, stake=0 ) if profit > max(5, expected_profit) and baisse > 0.30: self.pairs[pair]['force_sell'] = True self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'B30_' + pair + '_' + str(self.pairs[pair]['has_gain']) # if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 : #last_candle['cross_sma60']: # self.pairs[pair]['force_sell'] = True # return 'Range' # if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 \ # and last_candle['sma5_deriv1_1d'] < 0 and last_candle['sma5_deriv2_1d'] < 0 \ # and last_candle['sma60_deriv1'] < 0 and last_candle['sma60_deriv2'] < 0: # self.pairs[pair]['force_sell'] = True # self.pairs[pair]['force_stop'] = True # return 'Deriv'; if profit < - (dispo * 0.2): print(f'dispo={dispo} wallet={round(self.wallets.get_available_stake_amount())} limit={-dispo * 0.1}') self.pairs[pair]['force_sell'] = True self.pairs[pair]['force_stop'] = True return 'Force'; # if self.pairs[pair]['force_sell'] and self.pairs[pair]['last_trade'].close_profit > 0.02: # return "Force" # if last_candle['stop_buying']: # self.pairs[pair]['force_sell'] = True # return 'Stop' # Si momentum fort → on laisse courir if momentum > 1: return None # Si momentum faiblit → on prend profit plus tôt if momentum < 0 and current_profit > 0.02 and last_candle[self.sell_score_indicator.value] < before_last_candle[self.sell_score_indicator.value]\ and last_candle['close'] < last_candle['sma5']: self.pairs[pair]['last_profit'] = current_profit return "momentum_drop" return None def custom_stoploss(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1] profit = trade.calc_profit(current_rate) candle_at_buy = self.pairs[pair]['last_candle'] # if candle_at_buy['range_pos'] > self.range_pos_stoploss.value and candle_at_buy[self.stoploss_indicator.value] < 0: # return self.stoploss_force.value # # print(f'test stop loss {self.stoploss} {last_candle["stop_buying12_1d"]}') # if last_candle[self.stoploss_indicator.value] and (trade.nr_of_successful_entries >= 4 or self.wallets.get_available_stake_amount() < 300): # and profit < - 30 : # range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1d"] # range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1d"] # # if range_max == range_min: # print(f'ranges={range_min}') # return -0.1 # sécurité # # range_pos = (current_rate - range_min) / (range_max - range_min) # # if (range_pos > 1): # return -1 # # sl_min = -0.02 # sl_max = -0.1 #self.stoploss # # dynamic_sl = (sl_min + (1 - range_pos) * (sl_max - sl_min)) # # print(f'{current_time} Loss ranges={round(range_min,0)} {round(range_max, 0)} range_pos={round(range_pos, 3)} dynamic_sl={round(dynamic_sl, 3)} ' # f'profit={profit} current={current_profit} {self.stoploss_indicator.value} {self.stoploss_timeperiod.value} {last_candle[self.stoploss_indicator.value]}') # # return dynamic_sl return -1 def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] # informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe['zero'] = 0 for timeperiod in timeperiods: dataframe[f'max{timeperiod}'] = talib.MAX(dataframe['close'], timeperiod=timeperiod) dataframe[f'min{timeperiod}'] = talib.MIN(dataframe['close'], timeperiod=timeperiod) dataframe[f"percent{timeperiod}"] = dataframe['close'].pct_change(timeperiod) dataframe[f"sma{timeperiod}"] = dataframe['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(dataframe, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) # ###################################################################################################### ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") # informative = self.populateDataframe(informative, timeframe='1d') # heikinashi = qtpylib.heikinashi(informative) # informative['haopen'] = heikinashi['open'] # informative['haclose'] = heikinashi['close'] # informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose'] informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 for timeperiod in timeperiods: informative[f'max{timeperiod}'] = talib.MAX(informative['close'], timeperiod=timeperiod) informative[f'min{timeperiod}'] = talib.MIN(informative['close'], timeperiod=timeperiod) # informative[f"range{timeperiod}"] = ((informative["close"] - informative[f'min{timeperiod}']) / (informative[f'max{timeperiod}'] - informative[f'min{timeperiod}'])) # informative[f"percent{timeperiod}"] = informative['close'].pct_change(timeperiod) informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) # ###################################################################################################### range_min = dataframe[f"min12_1d"] range_max = dataframe[f"max48"] dataframe[f"range_pos"] = ((dataframe['mid'] - range_min) / (range_max)).rolling(5).mean() dataframe['cross_sma60'] = qtpylib.crossed_above(dataframe['sma5'], dataframe['sma60']) dataframe[f'stop_buying'] = qtpylib.crossed_below(dataframe[f"sma12"], dataframe['sma48']) # récupérer le dernier trade fermé trades = Trade.get_trades_proxy(pair=pair,is_open=False) if trades: last_trade = trades[-1] self.pairs[pair]['last_profit'] = last_trade.close_profit # ex: 0.12 = +12% self.pairs[pair]['last_trade'] = last_trade return dataframe def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # (dataframe['sma24_score'].shift(2) <= dataframe['zero']) # & (dataframe['sma5'].shift(1) <= dataframe['sma5']) # & (dataframe['sma5_inv'] == -1) # & (dataframe['min24'].shift(3) == dataframe['min24']) # ), # 'buy'] = 1 conditions = list() # # print(dataframe.columns) # # buy_indicator = self.buy_indicator0.value # buy_crossed_indicator = self.buy_crossed_indicator0.value # buy_operator = self.buy_operator0.value # buy_real_num = self.buy_real_num0.value # condition, dataframe = condition_generator( # dataframe, # buy_operator, # buy_indicator, # buy_crossed_indicator, # buy_real_num # ) # conditions.append(condition) # # backup # buy_indicator = self.buy_indicator1.value # buy_crossed_indicator = self.buy_crossed_indicator1.value # buy_operator = self.buy_operator1.value # buy_real_num = self.buy_real_num1.value # # condition, dataframe = condition_generator( # dataframe, # buy_operator, # buy_indicator, # buy_crossed_indicator, # buy_real_num # ) # conditions.append(condition) # # buy_indicator = self.buy_indicator2.value # buy_crossed_indicator = self.buy_crossed_indicator2.value # buy_operator = self.buy_operator2.value # buy_real_num = self.buy_real_num2.value # condition, dataframe = condition_generator( # dataframe, # buy_operator, # buy_indicator, # buy_crossed_indicator, # buy_real_num # ) # conditions.append(condition) # conditions.append((dataframe[self.stop_buying_indicator.value] == False) | (dataframe['range_pos'] < 0)) #conditions.append((dataframe[self.stop_buying_indicator.value] == False) | (dataframe['range_pos'] < 0)) conditions.append(dataframe['sma60_deriv1'] > self.buy_deriv_sma60.value) conditions.append(dataframe['sma5_deriv1_1d'] > self.buy_deriv_sma5d.value) conditions.append(dataframe['sma12_deriv1_1d'] > self.buy_deriv_sma12d.value) conditions.append(dataframe['hapercent'] > 0) # # conditions.append(dataframe[f"range_pos"] <= 0.5) conditions.append(((dataframe[f"range_pos"] < 0.05) ) | ((dataframe['sma12_deriv1'] > 0) & (dataframe['sma12_deriv2'] > 0))) # print(f"BUY indicators tested \n" # f"{self.buy_indicator0.value} {self.buy_crossed_indicator0.value} {self.buy_operator0.value} {self.buy_real_num0.value} \n" # f"{self.buy_indicator1.value} {self.buy_crossed_indicator1.value} {self.buy_operator1.value} {self.buy_real_num1.value} \n" # f"{self.buy_indicator2.value} {self.buy_crossed_indicator2.value} {self.buy_operator2.value} {self.buy_real_num2.value} \n" # ) if conditions: dataframe.loc[ reduce(lambda x, y: x & y, conditions), ['enter_long', 'enter_tag'] ] = (1, 'god') return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: conditions = list() # # TODO: Its not dry code! # sell_indicator = self.sell_indicator0.value # sell_crossed_indicator = self.sell_crossed_indicator0.value # sell_operator = self.sell_operator0.value # sell_real_num = self.sell_real_num0.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # sell_indicator = self.sell_indicator1.value # sell_crossed_indicator = self.sell_crossed_indicator1.value # sell_operator = self.sell_operator1.value # sell_real_num = self.sell_real_num1.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # sell_indicator = self.sell_indicator2.value # sell_crossed_indicator = self.sell_crossed_indicator2.value # sell_operator = self.sell_operator2.value # sell_real_num = self.sell_real_num2.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # # print(f"SELL indicators tested \n" # f"{self.sell_indicator0.value} {self.sell_crossed_indicator0.value} {self.sell_operator0.value} {self.sell_real_num0.value} \n" # f"{self.sell_indicator1.value} {self.sell_crossed_indicator1.value} {self.sell_operator1.value} {self.sell_real_num1.value} \n" # f"{self.sell_indicator2.value} {self.sell_crossed_indicator2.value} {self.sell_operator2.value} {self.sell_real_num2.value} \n" # ) # # # conditions.append(qtpylib.crossed_below(dataframe['mid'], dataframe['sma24'])) # conditions.append((dataframe['range_pos'] > 0.04)) # # if conditions: # dataframe.loc[reduce(lambda x, y: x & y, conditions), ['exit_long', 'exit_tag']] = (1, 'god') return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '5m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" tendency_col = f"{name}{suffixe}_state" series = dataframe[f"{name}{suffixe}"] d1 = series.diff() d2 = d1.diff() cond_bas = (d1.rolling(3).mean() > d1.rolling(10).mean()) cond_haut = (d1.rolling(3).mean() < d1.rolling(10).mean()) dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3) dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1)) dataframe[f"{name}{suffixe}_inv"] = np.where(cond_bas, -1, np.where(cond_haut, 1, 0)) short = d1.rolling(3).mean() long = d1.rolling(10).mean() spread = short - long zscore = (spread - spread.rolling(ema_period).mean()) / spread.rolling(ema_period).std() dataframe[f"{name}{suffixe}_score"] = zscore # #################################################################### # Calcul de la pente lissée d1 = series.diff() d1_smooth = d1.rolling(5).mean() # Normalisation z = (d1_smooth - d1_smooth.rolling(ema_period).mean()) / d1_smooth.rolling(ema_period).std() dataframe[f"{name}{suffixe}_trend_up"] = ( (d1_smooth.shift(1) < 0) & (d1_smooth > 0) & (z > 1.0) ) dataframe[f"{name}{suffixe}_trend_down"] = ( (d1_smooth.shift(1) > 0) & (d1_smooth < 0) & (z < -1.0) ) momentum_short = d1.rolling(int(ema_period / 2)).mean() momentum_long = d1.rolling(ema_period * 2).mean() dataframe[f"{name}{suffixe}_trend_change_up"] = ( (momentum_short.shift(1) < momentum_long.shift(1)) & (momentum_short > momentum_long) ) dataframe[f"{name}{suffixe}_trend_change_down"] = ( (momentum_short.shift(1) > momentum_long.shift(1)) & (momentum_short < momentum_long) ) return dataframe @property def protections(self): return [ { "method": "CooldownPeriod", "stop_duration_candles": 6 }, # { # "method": "MaxDrawdown", # "lookback_period_candles": 96, # "trade_limit": 4, # "max_allowed_drawdown": 0.1, # "stop_duration_candles": 24 # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 96, # "trade_limit": 2, # "stop_duration_candles": 48, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] print(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( # int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' # round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = self.getDistMax(last_candle, pair) # if trade_type is not None: # if np.isnan(last_candle['rsi_1d']): # string = ' ' # else: # string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_deriv1_1d'])) # trade_type = trade_type \ # + " " + string \ # + " " + str(int(last_candle['rsi_1h'])) \ # + " " + str(int(last_candle['rsi_deriv1_1h'])) # val144 = self.getProbaHausse144(last_candle) # val1h = self.getProbaHausse1h(last_candle) color = GREEN if profit > 0 else RED # color_sma24 = GREEN if last_candle['sma24_deriv1_1d'] > 0 else RED # color_sma24_2 = GREEN if last_candle['sma24_deriv2_1d'] > 0 else RED # color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1d'] > 0 else RED # color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1d'] > 0 else RED # color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED # color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED # color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED # color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" ) def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getDistMax(self, last_candle, pair): mx = last_candle['max12_1d'] dist_max = round(100 * (mx - last_candle['close']) / mx, 0) return dist_max def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit return expected_profit