from datetime import timedelta, datetime from freqtrade.strategy.interface import IStrategy from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) from pandas import DataFrame from freqtrade.persistence import Trade from sklearn.tree import DecisionTreeClassifier from sklearn.preprocessing import StandardScaler import numpy as np import talib.abstract as ta import freqtrade.vendor.qtpylib.indicators as qtpylib from typing import Optional, Union, Tuple from scipy.signal import find_peaks from typing import Any, Callable, Dict, List import logging import configparser # from sklearn.tree import export_text from sklearn.tree import export_graphviz from graphviz import Source import matplotlib.pyplot as plt logger = logging.getLogger(__name__) class DecisionTreeStrategy(IStrategy): plot_config = { "main_plot": { "enter_tag": { "color": "#197260" }, "max200": { "color": "#3dde2c" }, "min200": { "color": "#3dde2c" }, "sma5_1h": { "color": "#ffb009" }, 'bb_upperband_1d': { 'color': 'cyan' }, 'bb_lowerband_1d': { 'color': 'cyan' }, 'bb_middleband_1d': { 'color': 'red' } }, "subplots": { # "Volume": { # "volume_change": { # "color": "#d174dd" # }, # 'volume_spike': { # "color": "blue" # }, # 'volume_mean': { # "color": 'green' # } # }, "Rsi": { "rsi": { "color": "blue" }, "rsi_1h": { "color": "#c1b255" }, "rsi_sma_1h": { "color": "#9f0e43" }, "rsi_change": { "color": "green" }, "max200_diff": { "color": "red" } }, "Pct": { "percent12": { "color": "blue" }, "sma20_pct": { "color": "green" }, "sma5_pct_1h": { "color": "#6b09f4" }, # 'sma5_down_count_1h': { # 'color': 'blue' # }, # 'sma5_up_count_1h': { # 'color': 'red' # }, # 'sma5_down_count_1d': { # 'color': 'blue' # }, # 'sma5_up_count_1d': { # 'color': 'red' # } }, "Inversion": { # 'inversion': { # "color": "green" # }, # 'inversion_1h': { # "color": "blue" # }, 'trend_is_positive': { "color": "red" }, 'trend_pente_1h': { "color": "blue" } }, "DI": { "DI+_1h": { 'color': "red" }, 'DI_diff': { 'color': 'blue' }, 'DI_diff_1h': { 'color': 'green' } } } } protections = [ # { # "method": "StoplossGuard", # "lookback_period_candles": 12, # "trade_limit": 1, # "stop_duration_candles": 6, # "only_per_pair": True # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 12, # "trade_limit": 2, # "stop_duration_candles": 6, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 60, # "trade_limit": 1, # "stop_duration": 60, # "required_profit": -0.05 # }, { "method": "CooldownPeriod", "stop_duration_candles": 24 } ] # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21, 28, 37, 49] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 # Configuration de base minimal_roi = {"0": 10} # ROI minimum stoploss = -1 # Stop-loss à 5% timeframe = '5m' # Timeframe utilisée features = ['rsi', 'max200_diff', 'adx', 'atr', 'atr_1h', 'volatility_1h', 'sma5', 'rsi_change', 'volume_change'] target = 'future_direction' features_to_scale = ['volume_change'] # Excluez le RSI # Regrouper toutes les informations dans un seul dictionnaire pairs = { pair: { "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, "last_buy": 0.0 } for pair in ["BTC/USDT", "ETH/USDT", "DOGE/USDT", "DASH/USDT", "XRP/USDT", "SOL/USDT"] } close = 'close' open = 'open' startup_candle_count = 288 # threshold = DecimalParameter(-10, 10, decimals=1, default=0.5, space='buy') percent48 = DecimalParameter(-0.05, 0.05, decimals=3, default=-0.025, space='buy', optimize=False) # bb_width = DecimalParameter(0.00, 0.05, decimals=3, default=0.02, space='buy') # sma_pct_min = DecimalParameter(-0.05, 0.05, decimals=2, default=0.0, space='buy') # sma_pct_max = DecimalParameter(-0.05, 0.05, decimals=2, default=0.0, space='buy') # volume_change_buy = IntParameter(1, 20, default=2, space='buy') # volume_change_sell = IntParameter(1, 20, default=2, space='sell') sma20_factor = DecimalParameter(0, 1, decimals=1, default=0.25, space='sell', optimize=False) # percent_threshold = DecimalParameter(-0.1, 0.1, decimals=2, default=0, space='buy') sma5_pct_1h_sell = DecimalParameter(-0.01, 0.01, decimals=3, default=0.01, space='sell', optimize=False) # percent_1d_sell = DecimalParameter(0, 0.1, decimals=2, default=0.015, space='sell', optimize=False) # percent_1d_buy = DecimalParameter(0, 0.1, decimals=2, default=0.02, space='buy', optimize=False) percent_1d_loss_sell = DecimalParameter(0, 0.2, decimals=2, default=0.03, space='sell', optimize=False) # inversion_diff_sell = DecimalParameter(-1, 0, decimals=1, default=-0.8, space='sell') di_pente_5m_sell = IntParameter(-200, 0, default=-50, space='sell') # first_stack_factor = IntParameter(1, 20, default=20, space='buy') # bb_mid_pct_1d_stop = IntParameter(-100, 0, default=-50, space='buy', optimize=True) # bb_mid_pct_1d_start = IntParameter(0, 100, default=50, space='buy', optimize=True) # rsi_pct_1h_stop = IntParameter(-30, 0, default=-5, space='buy', optimize=True) # rsi_pct_1h_start = IntParameter(0, 30, default=5, space='buy', optimize=True) # percent24_1h_stop = IntParameter(-30, 0, default=-5, space='buy', optimize=True) # percent24_1h_start = IntParameter(0, 30, default=5, space='buy', optimize=True) # inversion_diff_buy = DecimalParameter(0.5, 1, decimals=2, default=0.8, space='buy', optimize=False) di_pente_5m_buy = IntParameter(0, 200, default=50, space='buy') protection_stake_amount = IntParameter(0, 1000, default=100, space='protection', optimize=False) sma5_pct_1h_gain_buy = DecimalParameter(-0.01, 0.01, decimals=3, default=0.1, space='protection', optimize=False) sma5_pct_1h_loss_buy = DecimalParameter(-0.01, 0.01, decimals=3, default=0.1, space='protection', optimize=False) max200_diff = DecimalParameter(0.00, 0.05, decimals=3, default=0.025, space='protection', optimize=False) di_pente_5m_stop = IntParameter(-100, 0, default=-50, space='protection') di_pente_1h_stop = IntParameter(-100, 0, default=-50, space='protection') di_pente_1d_stop = IntParameter(-100, 0, default=-50, space='protection') di_pente_5m_start = IntParameter(0, 100, default=50, space='protection') di_pente_1h_start = IntParameter(0, 100, default=50, space='protection') di_pente_1d_start = IntParameter(0, 100, default=50, space='protection') stop_buying = 0 # trailing stoploss hyperopt parameters use_custom_stoploss = True # Trailing stoploss trailing_stop = True trailing_stop_positive = 0.01 trailing_stop_positive_offset = 0.05 trailing_only_offset_is_reached = True columns_logged = False # hard stoploss profit pHSL = DecimalParameter(-0.15, -0.040, default=-1, decimals=3, space='sell', load=True) # # profit threshold 1, trigger point, SL_1 is used # pPF_1 = DecimalParameter(0.008, 0.020, default=0.016, decimals=3, space='sell', load=True) # pSL_1 = DecimalParameter(0.008, 0.020, default=0.011, decimals=3, space='sell', load=True) # # # profit threshold 2, SL_2 is used # pPF_2 = DecimalParameter(0.040, 0.100, default=0.080, decimals=3, space='sell', load=True) # pSL_2 = DecimalParameter(0.020, 0.070, default=0.040, decimals=3, space='sell', load=True) position_adjustment_enable = True # Example specific variables max_entry_position_adjustment = 15 # This number is explained a bit further down max_dca_multiplier = 5.5 # # This is called when placing the initial order (opening trade) # def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, # proposed_stake: float, min_stake: float | None, max_stake: float, # leverage: float, entry_tag: str | None, side: str, # **kwargs) -> float: # # # We need to leave most of the funds for possible further DCA orders # # This also applies to fixed stakes # return proposed_stake / self.max_dca_multiplier def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: # print('entry_tag' + str(entry_tag)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() dispo = round(self.wallets.get_available_stake_amount()) stake_amount = self.calculate_stake(pair, rate, last_candle) self.testStopBuying(last_candle) if self.stop_buying: # self.stop_buying = max(self.stop_buying, last_candle['close']) self.log_trade( last_candle=last_candle, date=current_time, action="STOP BUY", pair=pair, trade_type=f"{100 * last_candle['percent24_1h']:.0f} / {10000 * last_candle['bb_mid_pct_1d']:.0f} / {int(last_candle['rsi_pct_1h']):.0f}", rate=rate, dispo=dispo, profit=0, stake=round(stake_amount, 2) ) else: # if (self.stop_buying > 0) & ((last_candle['close'] - self.stop_buying) / last_candle['close'] > - 0): # self.stop_buying = 0 self.log_trade( last_candle=last_candle, date=current_time, action="START BUY", pair=pair, trade_type=f"{100 * last_candle['percent24_1h']:.0f} / {10000 * last_candle['bb_mid_pct_1d']:.0f} / {int(last_candle['rsi_pct_1h']):.0f}", rate=rate, dispo=dispo, profit=0, stake=round(stake_amount, 2) ) allow_to_buy = not self.stop_buying if allow_to_buy: self.pairs[pair]['last_max'] = rate self.pairs[pair]['max_touch'] = rate self.pairs[pair]['last_buy'] = rate self.log_trade( last_candle=last_candle, date=current_time, action="Buy", pair=pair, trade_type=entry_tag, rate=rate, dispo=dispo, profit=0, stake=round(stake_amount, 2) ) # logger.info(f"Allow_to_buy {allow_to_buy} {pair} {current_time} Buy={entry_tag} rate={rate} dispo={dispo} rsi_1h={rsi_1h}") return allow_to_buy def testStopBuying(self, last_candle): # trade_type = f"{100 * last_candle['percent24_1h']:.0f} / {10000 * last_candle['bb_mid_pct_1d']:.0f} / {int(last_candle['rsi_pct_1h']):.0f}", # if self.stop_buying == 0: # test_buying = (100 * last_candle['percent24_1h'] < self.percent24_1h_stop.value) | \ # ((int(10000 * last_candle['bb_mid_pct_1d']) < self.bb_mid_pct_1d_stop.value) & (int(last_candle['rsi_pct_1h']) < self.rsi_pct_1h_stop.value)) # if test_buying: # self.stop_buying = max(self.stop_buying, last_candle['close']) # else: # test_buying = (100 * last_candle['percent24_1h'] > self.percent24_1h_start.value) & \ # ((int(10000 * last_candle['bb_mid_pct_1d']) > self.bb_mid_pct_1d_start.value) & (int(last_candle['rsi_pct_1h']) > self.rsi_pct_1h_start.value)) # if test_buying: # self.stop_buying = 0 if self.stop_buying == 0: test_buying = (last_candle['DI+_pente'] < self.di_pente_5m_stop.value) \ & (last_candle['DI+_pente_1h'] < self.di_pente_1h_stop.value) \ & (last_candle['DI+_pente_1d'] < self.di_pente_1d_stop.value) if test_buying: self.stop_buying = max(self.stop_buying, last_candle['close']) else: test_buying = (last_candle['DI+_pente'] > self.di_pente_5m_start.value) \ & (last_candle['DI+_pente_1h'] > self.di_pente_1h_start.value) \ & (last_candle['DI+_pente_1d'] > self.di_pente_1d_start.value) if test_buying: self.stop_buying = 0 return def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs) -> bool: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() allow_to_sell = (last_candle['percent'] < -0.00) dispo = round(self.wallets.get_available_stake_amount()) if allow_to_sell: self.log_trade( last_candle=last_candle, date=current_time, action="Sell", pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(trade.calc_profit(rate, amount), 2) ) self.pairs[pair]['last_max'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_sell'] = rate else: self.log_trade( last_candle=last_candle, date=current_time, action="Cancel", pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo # profit=round(trade.calc_profit(rate, amount), 2), ) ok = (allow_to_sell) | (exit_reason == 'force_exit') return ok def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float | None, max_stake: float, current_entry_rate: float, current_exit_rate: float, current_entry_profit: float, current_exit_profit: float, **kwargs ) -> float | None | tuple[float | None, str | None]: if trade.has_open_orders: return None # Obtain pair dataframe (just to show how to access it) dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) # Only buy when not actively falling price. last_candle = dataframe.iloc[-1].squeeze() previous_candle = dataframe.iloc[-2].squeeze() previous_candle_12 = dataframe.iloc[-13].squeeze() previous_candle_288 = dataframe.iloc[-289].squeeze() # self.testStopBuying(last_candle) # return None if (len(dataframe) < 1) | (self.wallets.get_available_stake_amount() < 50) | (self.stop_buying > 0): return None count_of_buys, hours, days, first_price, last_price = self.getTradeInfos(current_time, trade) dispo = round(self.wallets.get_available_stake_amount()) first_stack = self.calculate_stake(trade.pair, current_rate, last_candle) # if last_candle['enter_tag'] == 'buy_upperclose_1d' and (hours > 1): # if (count_of_buys <= 3): # stake_amount = min(self.wallets.get_available_stake_amount(), self.calculate_stake(trade.pair, 0, last_candle)) # else: # stake_amount = min(self.wallets.get_available_stake_amount(), # count_of_buys / 3 * self.calculate_stake(trade.pair, 0, last_candle)) # # Take half of the profit at +5% # self.log_trade( # last_candle=last_candle, # date=current_time, # action="Loss BB -", # dispo=dispo, # pair=trade.pair, # rate=current_rate, # trade_type='Decrease', # profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2), # buys=trade.nr_of_successful_entries, # stake=round(stake_amount, 2) # ) # self.pairs[trade.pair]['max_touch'] = current_rate # # return stake_amount, "upperclose_1d" # self.last_max_by_pair[trade.pair] * (1 - self.percent_1d_loss_sell.value)) \ # & (last_candle['sma5_up_count_1h'] >= 1) last_pct = (last_candle['close'] - last_price) / last_price if ((last_pct <= -0.06) \ & (count_of_buys > 3) \ & (last_candle['DI_diff_1h'] >= -5) & (hours > 1) & (last_candle['close'] < last_candle['bb_middleband_1d']) & (previous_candle_288['bb_middleband_1d'] <= last_candle['bb_middleband_1d']) # & (last_candle['sma5_up_count_1h'] >= 1) ) | ( (last_pct <= -0.01 * count_of_buys) \ & (count_of_buys <= 3) & (last_candle['DI_diff_1h'] >= -5) & (hours > 1) & (previous_candle_288['bb_middleband_1d'] <= last_candle['bb_middleband_1d']) # & (last_candle['close'] < last_candle['bb_middleband_1d']) # & (last_candle['sma5_up_count_1h'] >= 1) ): # & (last_candle['enter_tag'] != ''): if (count_of_buys <= 3): stake_amount = min(self.wallets.get_available_stake_amount(), self.calculate_stake(trade.pair, 0, last_candle)) else: stake_amount = min(self.wallets.get_available_stake_amount(), count_of_buys / 3 * self.calculate_stake(trade.pair, 0, last_candle)) # Take half of the profit at +5% self.log_trade( last_candle=last_candle, date=current_time, action="Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type='Decrease', profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2), buys=trade.nr_of_successful_entries, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['max_touch'] = current_rate return stake_amount, "half_loss%" # if current_profit < (-0.05 - (0.05 * trade.nr_of_successful_entries)) \ # and trade.nr_of_successful_entries >= 1 \ # and (last_candle['rsi_1h'] > previous_candle_12['rsi_1h']) \ # and (last_candle['rsi_1h'] > 40): if (False) & (current_profit < - 0.01) & (count_of_buys <= 3) \ & (days >= 1) & (last_candle['DI+_1h'] > 10) \ & (last_candle['enter_tag'] != ''): # & (last_candle['sma5_pct_1h'] * 100 > self.sma5_pct_1h_loss_buy.value): # & (last_candle['max200_diff'] >= self.max200_diff.value): # & (last_candle['rsi_1h'] > 50) \ # & (last_candle['sma5_pct_1h'] * 100 > self.sma5_pct_1h_loss_buy.value): stake_amount = min(self.wallets.get_available_stake_amount(), max(first_stack, first_stack * (- current_profit / 0.05))) # (trade.stake_amount / 2) self.log_trade( last_candle=last_candle, date=current_time, action="Loss +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type='Increase', profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2), buys=trade.nr_of_successful_entries, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_max'] = max(self.pairs[trade.pair]['last_max'], current_rate) self.pairs[trade.pair]['max_touch'] = max(self.pairs[trade.pair]['max_touch'], current_rate) return stake_amount, "increase" # if (current_profit > -0.05) | (last_candle['rsi_1h'] < previous_candle_12['rsi_1h']) | ( # last_candle['rsi_1h'] < 40): # return None filled_entries = trade.select_filled_orders(trade.entry_side) count_of_entries = trade.nr_of_successful_entries # Allow up to 3 additional increasingly larger buys (4 in total) # Initial buy is 1x # If that falls to -5% profit, we buy 1.25x more, average profit should increase to roughly -2.2% # If that falls down to -5% again, we buy 1.5x more # If that falls once again down to -5%, we buy 1.75x more # Total stake for this trade would be 1 + 1.25 + 1.5 + 1.75 = 5.5x of the initial allowed stake. # That is why max_dca_multiplier is 5.5 # Hope you have a deep wallet! try: if (current_profit > 0.01) \ & (last_candle['close'] > first_price) \ & (last_candle['close'] >= last_price * 1.01) \ & ((hours >= 2) | (last_candle['rsi_change'] > 0.2)) \ & (last_candle['rsi_1h'] <= 55) \ & (count_of_buys <= self.max_entry_position_adjustment) \ & (last_candle['sma5_1h'] > previous_candle_12['sma5_1h']): # This returns first order stake size stake_amount = filled_entries[0].stake_amount_filled # print(f"1 - {stake_amount}") # This then calculates current safety order size # stake_amount = min(self.wallets.get_available_stake_amount(), stake_amount * ( # 1 + (count_of_entries * (100 + 10 * count_of_buys) * last_candle['sma5_pct_1h']))) stake_amount = min(self.wallets.get_available_stake_amount(), self.calculate_stake(trade.pair, 0, last_candle)) # print(f"2 - {stake_amount}") self.log_trade( last_candle=last_candle, date=current_time, dispo=dispo, action="Gain +", rate=current_rate, pair=trade.pair, trade_type='Increase', profit=round(current_profit, 2), buys=count_of_entries, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_max'] = max(self.pairs[trade.pair]['last_max'], current_rate) self.pairs[trade.pair]['max_touch'] = max(self.pairs[trade.pair]['max_touch'], current_rate) return stake_amount, "inc_volume" except Exception as exception: print(exception) return None return None def getTradeInfos(self, current_time, trade): filled_buys = trade.select_filled_orders('buy') count_of_buys = len(filled_buys) first_price = filled_buys[0].price days = 0 minutes = 0 hours = 0 last_price = first_price mises=0 for buy in filled_buys: minutes = (current_time - buy.order_date_utc).seconds / 60 hours = round(minutes / 60, 0) days = (current_time - buy.order_date_utc).days last_price = buy.price mises += buy.amount * buy.price self.pairs[trade.pair]['trade_info'] = { "count_of_buys": count_of_buys, "hours": hours, "days": days, "minutes": minutes, "first_price": first_price, "last_price": last_price, "mises": mises } return count_of_buys, hours, days, first_price, last_price def custom_exit(self, pair: str, trade: 'Trade', current_time: 'datetime', current_rate: float, current_profit: float, **kwargs) -> 'Optional[Union[str, bool]]': """ Custom exit function for dynamic trade exits. """ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) # Obtenir les données actuelles pour cette paire last_candle = dataframe.iloc[-1].squeeze() previous_last_candle = dataframe.iloc[-2].squeeze() count_of_buys, hours, days, first_price, last_price = self.getTradeInfos(current_time, trade) days = (current_time - trade.open_date_utc).days expected_profit = 2 * last_candle['atr_1h'] # current_profit = current_profit / count_of_buys self.pairs[pair]['max_touch'] = max(current_rate, self.pairs[pair]['max_touch']) max_percent = expected_profit * 0.75 # self.min_max_buys[pair]['profit'] / 5 # last_candle['bb_width'] / 3.5 # 0.005 max_profit = 0.015 # last_candle['bb_width'] * 3 / 4 # 0.015 # if (current_profit > 0.05) & (last_candle['volume_change_1h'] < - self.volume_change_sell.value * 100): # return 'volume' limit_sell = (last_candle['close'] - self.pairs[trade.pair]['max_touch']) / self.pairs[trade.pair]['max_touch'] # if (days > 1) \ # & (last_candle['percent'] < 0) \ # & (current_profit > 0.00) \ # & (previous_last_candle['rsi'] >= 65) \ # & (last_candle['volume_change'] < previous_last_candle['volume_change']): # return f"rsi {last_candle['rsi_1h']:.0f}" if (current_profit > 0.01) & (hours > 3) & (count_of_buys > 1) & (last_candle['percent12'] < 0) & \ (previous_last_candle['sma20'] > last_candle['sma20']) & \ (previous_last_candle['sma5_1h'] > last_candle['sma5_1h']) & \ (previous_last_candle['trend_is_positive'] > last_candle['trend_is_positive']): return 'sell_tout_baisse' # ((last_candle['sma5_up_count_1h'] > 5) & (limit_sell > -0.03)) if (last_candle['percent'] > 0) | (last_candle['percent3'] > 0.0) | (last_candle['percent5'] > 0.0)\ | (last_candle['close'] * 1.006 < last_candle['bb_upperband_1d']): return None self.testStopBuying(last_candle) if (self.stop_buying > 0) & (limit_sell < -0.03) & (last_candle['DI+_1h'] < 10): return f"stop_buying {limit_sell * 100:.0f}" # if (current_profit > 0.000) & ((current_time - trade.open_date_utc).days >= 2): # return f"too_old" # if (current_profit > 0.005) & (limit_sell < -current_profit * 0.75) & (last_candle['DI+_1h'] < 10): return f"limit_1 {limit_sell:.3f}" # if (current_profit > 0.005) & (limit_sell < - current_profit / 4) & (last_candle['percent12'] < 0) \ # & (last_candle['close'] > last_candle['bb_upperband_1d']): # return f"limit_2 {limit_sell:.3f}" if (current_profit >= expected_profit) \ & (last_candle['percent3'] < -max_percent) \ & (last_candle[self.close] > last_candle['sma5_1h']) \ & (last_candle['DI_diff'] < -20): return f"quick_lost {last_candle['percent3'] * 100:.1f}" if (False) & (current_profit > 0) & \ (limit_sell < -0.01) \ & ((current_time - trade.open_date_utc).seconds >= 3600) \ & (last_candle['DI_diff'] < 0) \ & (last_candle['sma5_pct_1h'] < -0): return f"DI_diff {last_candle['DI_diff']:.0f} {limit_sell:.2f}" # ((self.pairs[trade.pair]['last_max'] - last_candle['close']) / self.pairs[trade.pair]['last_max'] > 0.03): # if (last_candle['DI+_pente_1d'] < 0) & (last_candle['DI+_pente_1h'] < 0) \ # & ((self.pairs[trade.pair]['last_max'] - last_candle['close']) / self.pairs[trade.pair]['last_max'] > 0.03): # return f"DI {last_candle['DI+_pente_1d']:.1f} / {last_candle['DI+_pente_1h']:.1f}" # if (last_candle['percent48'] <= -0.04) | (last_candle['percent24'] <= -0.04) | (last_candle['percent12'] <= -0.04) \ # & ((current_time - trade.open_date_utc).seconds >= 3600): # return "quick_lost" # if self.profit_b_sma20.value: if (current_profit > expected_profit) \ & (previous_last_candle['sma10'] > last_candle['sma10']) \ & ((current_time - trade.open_date_utc).seconds >= 3600) \ & ((previous_last_candle['sma20'] > last_candle['sma20']) & ((last_candle['percent5'] < - current_profit * self.sma20_factor.value) # | (last_candle['percent12'] < - current_profit * self.sma20_factor.value) # | (last_candle['percent24'] < - current_profit * self.sma20_factor.value) )) \ & (last_candle['sma5_pct_1h'] * 100 < self.sma5_pct_1h_sell.value): # print("over_bb_band_sma10_desc", pair, trade, " profit=", current_profit, " rate=", current_rate) return 'b_sma20' # Par défaut, ne pas sortir return None def custom_stoploss(self, pair: str, trade: 'Trade', current_time: datetime, current_rate: float, current_profit: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) # Obtenir les données actuelles pour cette paire last_candle = dataframe.iloc[-1].squeeze() # self.getTradeInfos(current_time, trade) # print(f"current_profit={current_profit} mises=" + str(round(self.pairs[pair]['trade_info']['mises'], 4))) limit_sell = (last_candle['close'] - self.pairs[trade.pair]['max_touch']) / self.pairs[trade.pair]['max_touch'] if (current_profit > 0.08) & (limit_sell < -0.01) & (last_candle['DI+_1h'] < 10): sl_profit = 0.85 * current_profit # n% du profit en cours else: sl_profit = self.pHSL.value # Hard stop-loss stoploss = stoploss_from_open(sl_profit, current_profit) return stoploss # def custom_stoploss(self, pair: str, trade: 'Trade', current_time: 'datetime', # current_rate: float, current_profit: float, **kwargs) -> float: # dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) # # # Obtenir les données actuelles pour cette paire # last_candle = dataframe.iloc[-1].squeeze() # # hard stoploss profit # HSL = self.pHSL.value # PF_1 = self.pPF_1.value # SL_1 = self.pSL_1.value # PF_2 = self.pPF_2.value # SL_2 = self.pSL_2.value # # # For profits between PF_1 and PF_2 the stoploss (sl_profit) used is linearly interpolated # # between the values of SL_1 and SL_2. For all profits above PL_2 the sl_profit value # # rises linearly with current profit, for profits below PF_1 the hard stoploss profit is used. # if current_profit > PF_2: # sl_profit = SL_2 + (current_profit - PF_2) # elif current_profit > PF_1: # sl_profit = SL_1 + ((current_profit - PF_1) * (SL_2 - SL_1) / (PF_2 - PF_1)) # else: # sl_profit = HSL # # stoploss = stoploss_from_open(sl_profit, current_profit) # #logger.info(f"stoploss={stoploss}") # return stoploss # Indicateurs personnalisés pour le DataFrame def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: heikinashi = qtpylib.heikinashi(dataframe) dataframe['enter_long'] = "" dataframe['enter_tag'] = "" dataframe['mid'] = (dataframe['close'] + dataframe['open']) / 2 dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = dataframe['haclose'].pct_change() # Ajout d'indicateurs techniques dataframe['rsi'] = ta.RSI(dataframe, timeperiod=14) dataframe['rsi_change'] = dataframe['rsi'].pct_change(3) # dataframe['ema_short'] = ta.EMA(dataframe, timeperiod=9) # dataframe['ema_long'] = ta.EMA(dataframe, timeperiod=21) # dataframe['ema_50'] = ta.EMA(dataframe, timeperiod=50) # dataframe['ema_200'] = ta.EMA(dataframe, timeperiod=200) dataframe['bb_upperband'], dataframe['bb_middleband'], dataframe['bb_lowerband'] = ta.BBANDS( dataframe['close'], timeperiod=20 ) dataframe["bb_width"] = ( (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"] ) dataframe['min200'] = ta.MIN(dataframe['close'], timeperiod=200) dataframe['max200'] = ta.MAX(dataframe['close'], timeperiod=200) dataframe['max50'] = ta.MAX(dataframe['close'], timeperiod=50) dataframe['min50'] = ta.MIN(dataframe['close'], timeperiod=50) dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close'] dataframe['max50_diff'] = (dataframe['max50'] - dataframe['close']) / dataframe['close'] # ADX pour la force de tendance dataframe['adx'] = ta.ADX(dataframe) dataframe['atr'] = ta.ATR(dataframe['high'], dataframe['low'], dataframe[self.close], timeperiod=144) / \ dataframe[self.close] # Calcul de nouvelles colonnes pour l'entraînement du modèle dataframe['percent'] = dataframe['close'].pct_change() dataframe['futur_pct'] = dataframe['percent'] # dataframe['close'] - dataframe['close'].shift(1) dataframe['sma5'] = ta.SMA(dataframe, timeperiod=5) dataframe['sma10'] = ta.SMA(dataframe, timeperiod=10) dataframe['sma20'] = ta.SMA(dataframe, timeperiod=20) dataframe['Interpolated'] = dataframe['sma5'].ewm(span=3, adjust=False).mean() dataframe["percent3"] = dataframe[self.close].pct_change(3) dataframe["percent5"] = dataframe[self.close].pct_change(5) dataframe["percent12"] = dataframe[self.close].pct_change(12) dataframe["percent24"] = dataframe[self.close].pct_change(24) dataframe["percent48"] = dataframe[self.close].pct_change(48) dataframe["sma20_pct"] = dataframe['sma20'].pct_change() dataframe['volume2'] = dataframe['volume'] dataframe.loc[dataframe['percent'] < 0, 'volume2'] *= -1 dataframe['volume_change'] = dataframe['volume2'].rolling(5).sum() # # Charger les données (par exemple, dataframe avec une colonne "close") # dataframe['SMA5'] = dataframe['mid'].rolling(window=5).mean() # # # Calcul de la variation de pente # dataframe['SMA5_diff'] = dataframe['SMA5'].diff() # # # Calcul de la variation accélérée (différence de pente) # dataframe['SMA5_diff2'] = dataframe['SMA5_diff'].diff() # # # Identifier les inversions brusques (exemple : seuil = 0.5) # dataframe['inversion'] = ((dataframe['SMA5_diff'] * dataframe['SMA5_diff'].shift(-1) < 0) & # (dataframe['SMA5_diff2'] > self.threshold.value)) # Ajoutez la colonne en utilisant .loc pour éviter l'avertissement # dataframe.loc[:, 'future_direction'] = dataframe['close'].shift(-1) - dataframe['close'] # dataframe.loc[:, 'future_direction'] = dataframe['future_direction'].apply(lambda x: 1 if x > 0 else 0) # dataframe['future_direction'] = dataframe['percent48'] > 0.02 # dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0) # Supprime les lignes avec des valeurs manquantes # dataframe = dataframe.dropna() dataframe['DI+'] = ta.PLUS_DI(dataframe, window=12) dataframe['DI-'] = ta.MINUS_DI(dataframe, window=12) dataframe['DI_diff'] = dataframe['DI+'] - dataframe['DI-'] # Vérification de la tendance dataframe['trend_is_positive'] = (dataframe['DI+'] - dataframe['DI-']) * (dataframe['adx']) dataframe['trend_pente'] = dataframe['trend_is_positive'] - dataframe['trend_is_positive'].shift(1) # Calcul de la pente de la SMA dataframe['SMA20_slope'] = 100 * dataframe['sma20'].diff() / dataframe['sma20'] dataframe['trend_is_positive_2'] = dataframe['SMA20_slope'] > 0 dataframe['trend_is_negative_2'] = dataframe['SMA20_slope'] < 0 dataframe["volume_mean"] = dataframe["volume"].rolling(window=14).mean() dataframe["volume_change_mean"] = dataframe["volume_change"].rolling(window=14).mean() # Sortie si le volume est 3x supérieur à la moyenne dataframe["volume_spike"] = dataframe["volume"] > (dataframe["volume_mean"] * 3) # Déterminer si la bougie est haussière ou baissière dataframe["bullish_candle"] = dataframe["close"] > dataframe["open"] dataframe["bearish_candle"] = dataframe["close"] < dataframe["open"] # Volume spike haussier (fort volume + bougie haussière) dataframe["bullish_volume_spike"] = (dataframe["volume_change"] > dataframe["volume_change_mean"] * 3) & dataframe["volume_change"] > 0 # Volume spike baissier (fort volume + bougie baissière) dataframe["bearish_volume_spike"] = (dataframe["volume_change"] < dataframe["volume_change_mean"] * 3) & dataframe["volume_change"] < 0 # ====================================================================================== ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") informative['sma5'] = ta.SMA(informative, timeperiod=5) informative['sma5_pct'] = informative['sma5'].pct_change() informative["percent"] = informative[self.close].pct_change() informative['volume2'] = informative['volume'] informative.loc[dataframe['percent'] < 0, 'volume2'] *= -1 informative['volume_change'] = informative['volume2'].rolling(5).sum() informative['volatility'] = ta.STDDEV(informative['close'], timeperiod=14) / informative['close'] informative['atr'] = (ta.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / \ informative['close'] informative['rsi'] = ta.RSI(informative['close'], timeperiod=12) informative['rsi_sma'] = ta.SMA(informative['rsi'], timeperiod=5) informative['rsi_pct'] = 100 * informative['rsi'].pct_change() informative['percent24'] = informative['close'].pct_change(24) # Détecter les baisses de SMA5 (True si SMA5 baisse, False sinon) informative['sma5_down'] = informative['sma5'].diff() <= 0 informative['sma5_up'] = informative['sma5'].diff() >= 0 # Compter les baisses consécutives informative['sma5_down_count'] = informative['sma5_down'].astype(int) * ( informative['sma5_down'].groupby((informative['sma5_down'] != informative['sma5_down'].shift()).cumsum()).cumcount() + 1) informative['sma5_up_count'] = informative['sma5_up'].astype(int) * ( informative['sma5_up'].groupby((informative['sma5_up'] != informative['sma5_up'].shift()).cumsum()).cumcount() + 1) informative['Interpolated'] = informative['sma5'].ewm(span=3, adjust=False).mean() informative['inversion'] = ( (informative['Interpolated'].shift(2) > informative['Interpolated'].shift( 1)) # La pente devient positive (actuel) & (informative['Interpolated'].shift(1) < informative['Interpolated']) # & (data[f'SMA{sma_period}_diff2'] < data['threshold']) # Le changement est brusque ) # Calcul de l'ADX informative['adx'] = ta.ADX(informative) informative['DI+'] = ta.PLUS_DI(informative, window=5) informative['DI+_pente'] = 100 * informative['DI+'].diff(3) / informative['DI+'] informative['DI-'] = ta.MINUS_DI(informative, window=5) informative['DI-_pente'] = 100 * informative['DI-'].diff(3) / informative['DI-'] informative['DI_diff'] = informative['DI+'] - informative['DI-'] # Vérification de la tendance informative['trend_is_positive'] = (informative['DI+'] - informative['DI-']) * (informative['adx']) informative['trend_pente'] = informative['trend_is_positive'] - informative['trend_is_positive'].shift(1) # Calcul de la pente de la SMA informative['SMA5_slope'] = 100 * informative['sma5'].diff() / informative['sma5'] informative['trend_is_positive_2'] = informative['SMA5_slope'] > 0 informative['trend_is_negative_2'] = informative['SMA5_slope'] < 0 informative['inversion_slope'] = ( (informative['SMA5_slope'].shift(2) > informative['SMA5_slope'].shift( 1)) # La pente devient positive (actuel) & (informative['SMA5_slope'].shift(1) < informative['SMA5_slope']) # & (data[f'SMA{sma_period}_diff2'] < data['threshold']) # Le changement est brusque ) # heikinashi = qtpylib.heikinashi(informative) # informative['ha_open'] = heikinashi['open'] # informative['ha_close'] = heikinashi['close'] # informative['ha_high'] = heikinashi['high'] # informative['ha_low'] = heikinashi['low'] dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) # ====================================================================================== ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative['rsi'] = ta.RSI(informative['close'], timeperiod=12) informative['sma5'] = ta.SMA(informative, timeperiod=3) informative['sma5_pct'] = informative['sma5'].pct_change() informative['rsi_pct'] = 100 * informative['rsi'].pct_change() informative['bb_upperband'], informative['bb_middleband'], informative['bb_lowerband'] = ta.BBANDS( informative['close'], timeperiod=20 ) informative["bb_mid_pct"] = informative['bb_middleband'].pct_change() # Calcul de l'ADX informative['adx'] = ta.ADX(informative) informative['DI+'] = ta.PLUS_DI(informative, window=5) informative['DI+_pente'] = 100 * informative['DI+'].diff(3) / informative['DI+'] informative['DI-'] = ta.MINUS_DI(informative, window=5) informative['DI-_pente'] = 100 * informative['DI-'].diff(3) / informative['DI-'] # Vérification de la tendance informative['trend_is_positive'] = (informative['DI+'] - informative['DI-']) * (informative['adx']) # Calcul de la pente de la SMA informative['SMA5_slope'] = 100 * informative['sma5'].diff() / informative['sma5'] informative['trend_is_positive_2'] = informative['SMA5_slope'] > 0 informative['trend_is_negative_2'] = informative['SMA5_slope'] < 0 informative['atr'] = (ta.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / \ informative['close'] informative['sma5_down'] = informative['sma5'].diff() <= 0 informative['sma5_up'] = informative['sma5'].diff() >= 0 informative['sma5_down_count'] = informative['sma5_down'].astype(int) * ( informative['sma5_down'].groupby((informative['sma5_down'] != informative['sma5_down'].shift()).cumsum()).cumcount() + 1) informative['sma5_up_count'] = informative['sma5_up'].astype(int) * ( informative['sma5_up'].groupby((informative['sma5_up'] != informative['sma5_up'].shift()).cumsum()).cumcount() + 1) dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) # dataframe = self.detect_sma_inversions(dataframe, column='mid', sma_period=5, threshold_factor=self.threshold.value) dataframe['DI+'] = ta.PLUS_DI(dataframe, window=14) dataframe['DI+_pente'] = 100 * dataframe['DI+'].diff(3) / dataframe['DI+'] dataframe['DI-'] = ta.MINUS_DI(dataframe, window=14) dataframe['DI-_pente'] = 100 * dataframe['DI-'].diff(3) / dataframe['DI-'] dataframe['sma20_5_diff'] = 100 * (dataframe['sma5'].shift(7) - dataframe['sma5'].shift(2)) / dataframe[ 'sma5'].shift(2) dataframe['inversion'] = ( (dataframe['Interpolated'].shift(2) >= dataframe['Interpolated'].shift(1)) # La pente devient positive (actuel) & (dataframe['Interpolated'].shift(1) <= dataframe['Interpolated']) # Le changement est brusque ) dataframe['inversion_s'] = ( (dataframe['Interpolated'].shift(2) <= dataframe['Interpolated'].shift(1)) # La pente devient positive (actuel) & (dataframe['Interpolated'].shift(1) >= dataframe['Interpolated']) ) dataframe = self.populate_future_direction(dataframe) return dataframe def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Initialisation du modèle d'arbre de décision self.model = DecisionTreeClassifier(max_depth=5, random_state=42) self.scaler = StandardScaler() # Pour normaliser les données def train_model(self, dataframe: DataFrame) -> None: # Supprimez les valeurs infinies et NaN dataframe.replace([np.inf, -np.inf], np.nan, inplace=True) # Supprimez les lignes avec des NaN dans les colonnes spécifiques train_data = dataframe.dropna(subset=self.features + [self.target]) if train_data.empty: raise ValueError("Aucune donnée valide après suppression des NaN et infinis. Vérifiez vos calculs.") X = train_data[self.features] y = train_data[self.target] # Ajustez le scaler et le modèle self.scaler.fit(X) # dataframe[features_to_scale] = self.scaler.fit_transform(dataframe[self.features_to_scale]) self.model.fit(self.scaler.transform(X), y) def validate_data(self, X): if np.any(np.isnan(X)): raise ValueError("Les données contiennent des valeurs NaN.") if np.any(np.isinf(X)): raise ValueError("Les données contiennent des valeurs infinies.") def predict(self, dataframe: DataFrame) -> np.ndarray: predict_data = dataframe.dropna(subset=self.features) if predict_data.empty: return np.array([]) X = predict_data[self.features] # Vérifiez que le scaler est ajusté if not hasattr(self.scaler, 'mean_'): raise ValueError("Scaler not fitted. Call `train_model` before predicting.") X_scaled = self.scaler.transform(X) return self.model.predict(X_scaled) def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe = self.populate_future_direction(dataframe) # # Appeler train_model pour ajuster scaler et modèle # # #if self.model is None: # self.train_model(dataframe) # # predictions = self.predict(dataframe) # dataframe['buy_signal'] = 0 # dataframe.loc[dataframe.index[:len(predictions)], 'buy_signal'] = predictions # dataframe.loc[(dataframe['buy_signal'] == 1), # ['enter_long', 'enter_tag'] # ] = [1 , 'buy_predict'] # dataframe.loc[ # (dataframe['inversion'] == 1) # # & (dataframe['close'] < dataframe['min50'] * 1.002) # & (dataframe['DI+'] > 0) # # & (dataframe['DI+'] - dataframe['DI+'].shift(1) > 0) # & (dataframe["sma20"] > dataframe["sma20"].shift(1)) # , # ['enter_long', 'enter_tag']] = [1, 'buy_inversion'] dataframe.loc[ (dataframe['percent3'] > 0) & ((dataframe['bb_upperband_1d'].shift(10) - dataframe['close'].shift(10)) / dataframe['close'].shift(10) > 0.05) & (dataframe['close'].shift(10) < dataframe['bb_lowerband_1d'].shift(10) * 1.007) & (dataframe['min50'].shift(10) == dataframe['min50']), ['enter_long', 'enter_tag']] = [1, 'buy_upperclose_1d'] dataframe.loc[ (dataframe['percent3'] > 0) & (dataframe['close'] < dataframe['bb_upperband_1d']) & (dataframe['sma20'].shift(1) < dataframe['sma20']) & (dataframe['sma5_1h'].shift(1) < dataframe['sma5_1h']) & (dataframe['trend_is_positive'].shift(1) < dataframe['trend_is_positive']), ['enter_long', 'enter_tag']] = [1, 'buy_tout_monte'] # dataframe.loc[ # (dataframe['inversion'] == 1) # & (dataframe['max50_diff'].shift(1) >= 0.035) # & (dataframe['DI_diff'] > - 30), # ['enter_long', 'enter_tag']] = [1, 'buy_inversion_2'] # dataframe.loc[ # # (dataframe['inversion'] == 1) # # & (dataframe['atr'] >= 0.001) # # (dataframe['volume_change_1h'] > self.volume_change_buy.value * 100) # (dataframe["close"] == dataframe['min200']) # & ( # (dataframe['percent12'] <= self.percent48.value) # | (dataframe['percent24'] <= self.percent48.value) # | (dataframe['percent12'] <= self.percent48.value) # ), # #& (self.sma_pct_min.value < dataframe["sma20_pct"] * 100) # #& (dataframe["sma20_pct"] * 100 > self.sma_pct_max.value) # # & (dataframe['bb_width'] >= self.bb_width.value) # #& (dataframe['max200_diff'] >= self.max200_diff.value), # ['enter_long', 'enter_tag'] # ] = [1, 'buy_inversion'] # Supposons que `model` est votre modèle DecisionTreeClassifier ou DecisionTreeRegressor # tree_rules = export_text(self.model, feature_names=self.features) # `feature_names` est la liste des noms de vos colonnes # print(tree_rules) # # Exporter l'arbre au format .dot # export_graphviz( # self.model, # out_file="tree.dot", # feature_names=self.features, # #class_names=self.target, # Facultatif # filled=True, # rounded=True # ) # # # Charger et visualiser le fichier .dot # with open("tree.dot") as f: # dot_graph = f.read() # graph = Source(dot_graph) # graph.render("decision_tree " + metadata['pair']) # Génère un fichier PNG ou PDF # graph.view() # plt.figure(figsize=(12, 6)) # plt.plot(dataframe['close'], label='Close Price', alpha=0.5) # plt.plot(dataframe['SMA5'], label='SMA5', linewidth=2) # inversions = dataframe[dataframe['inversion']] # # Marquer les inversions # plt.scatter(inversions.index, inversions['SMA5'], color='red', label='Inversions brusques' + metadata['pair']) # plt.legend() # plt.show() return dataframe def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ Définit les signaux de sortie. """ # Sort lorsque le prix ferme sous la bande inférieure des Bollinger # dataframe.loc[ # (dataframe['close'] > dataframe['bb_upperband']), # 'sell' # ] = 1 # dataframe.loc[ # (dataframe['inversion_s'] == 1) # & (dataframe['sma20_5_diff'] < self.inversion_diff_sell.value), # ['exit_long', 'exit_tag']] = [1 , 'sell_inversion'] return dataframe def populate_future_direction(self, dataframe: DataFrame) -> DataFrame: """ Ajoute une colonne `future_direction` pour indiquer si le prix augmente (1) ou diminue (0). """ dataframe['future_change'] = dataframe['percent12'] dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0.005 else 0) return dataframe def backtest(self, dataframe: DataFrame) -> None: """ Exemple de test du modèle sur des données historiques. """ dataframe = self.populate_future_direction(dataframe) self.train_model(dataframe) def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: return self.calculate_stake(pair, None, None) def calculate_stake(self, pair, value, last_candle): amount = self.config['stake_amount'] #1000 / self.first_stack_factor.value self.protection_stake_amount.value # return amount # def add_future_direction(self, dataframe: DataFrame) -> DataFrame: # dataframe['future_change'] = dataframe['percent48'] # dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0) # return dataframe def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs # # Fonction générique pour détecter les inversions brusques d'une SMA # def detect_sma_inversions(self, data, column='close', sma_period=20, threshold_factor=2): # """ # Détecte les inversions brusques pour une SMA donnée. # # :param data: DataFrame contenant les données de prix. # :param column: Nom de la colonne des prix (ex. 'close'). # :param sma_period: Période de la SMA (ex. 20 pour SMA20). # :param threshold_factor: Facteur pour calculer le seuil basé sur l'écart-type. # :return: DataFrame avec une colonne 'inversion' indiquant les points d'inversion. # """ # # Calcul de la SMA # data[f'SMA{sma_period}'] = data[column].rolling(window=sma_period).mean() # # # Calcul de la variation de la pente (différences successives) # data[f'SMA{sma_period}_diff'] = 100 * data[f'SMA{sma_period}'].diff() / data['close'] # # # Calcul de la variation accélérée (différence des différences) # data[f'SMA{sma_period}_diff2'] = \ # (data[f'SMA{sma_period}_diff'].shift(3) + data[f'SMA{sma_period}_diff'].shift(2) + data[ # f'SMA{sma_period}_diff'].shift(1)) / 3 - (data[f'SMA{sma_period}_diff'].shift(2) + data[f'SMA{sma_period}_diff'].shift(1) + data[f'SMA{sma_period}_diff']) / 3 # # # Calcul de l'écart-type pour normaliser le seuil # # std_diff2 = data[f'SMA{sma_period}_diff2'].std() # # # Définir un seuil basé sur le facteur et l'écart-type # #threshold = threshold_factor * std_diff2 # # data['dir_change'] = (data[f'SMA{sma_period}_diff'].shift(1) * data[f'SMA{sma_period}_diff'] < 0) # data['threshold_diff'] = data[f'SMA{sma_period}_diff2'].shift(1) - threshold_factor # # Identifier les inversions brusques # # data['inversion'] = ( # # (data[f'SMA{sma_period}_diff'] * data[f'SMA{sma_period}_diff'].shift(-1) < 0) & # Changement de direction # # (data[f'SMA{sma_period}_diff2'] < threshold) # Changement brusque # # ) # data['inversion'] = ( # (data[f'SMA{sma_period}_diff'].shift(1) < 0) & # La pente était négative # (data[f'SMA{sma_period}_diff'] > 0) & # La pente devient positive # # (data['percent5'] <= self.percent_threshold.value) & # (data['close'] <= data['close_1h'].shift(12)) & # (data[f'SMA{sma_period}_diff2'].shift(1) > threshold_factor) # Le changement est brusque # ) # # return data def detect_sma_inversions(self, data, column='close', sma_period=20, threshold_factor=2, rolling_window=50): """ Détecte les inversions de tendance SMA vers le haut sans regarder dans le futur. :param data: DataFrame contenant les données de prix. :param column: Nom de la colonne des prix (ex. 'close'). :param sma_period: Période de la SMA. :param threshold_factor: Facteur pour calculer le seuil basé sur l'écart-type. :param rolling_window: Fenêtre pour calculer l'écart-type sur les données passées. :return: DataFrame avec une colonne 'inversion_up' indiquant les points d'inversion. """ # Calcul de la SMA data[f'SMA{sma_period}'] = data[column].rolling(window=sma_period).mean() # Calcul de la variation de la pente (différences successives) data[f'SMA{sma_period}_diff'] = data[f'SMA{sma_period}'].shift(1) - data[f'SMA{sma_period}'] # Calcul de la variation accélérée (différence des différences) data[f'SMA{sma_period}_diff2'] = data[f'SMA{sma_period}_diff'].shift(1) - data[f'SMA{sma_period}_diff'] # Calcul de l'écart-type basé uniquement sur les données historiques data['rolling_std_diff2'] = ( data[f'SMA{sma_period}_diff2'].rolling(window=rolling_window, min_periods=1).std() ) # Définir un seuil basé sur le facteur et l'écart-type historique data['threshold'] = threshold_factor * data['rolling_std_diff2'] # Identifier les inversions vers le haut sans regarder dans le futur data['inversion'] = ( (data[f'SMA{sma_period}_diff'] > 0) # La pente était négative (historique) & (data[f'SMA{sma_period}_diff'].shift(2) < data[f'SMA{sma_period}_diff'].shift( 1)) # La pente devient positive (actuel) & (data[f'SMA{sma_period}_diff'].shift(1) >= data[f'SMA{sma_period}_diff']) # & (data[f'SMA{sma_period}_diff2'] < data['threshold']) # Le changement est brusque ) print(data['inversion']) return data def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt': return if not self.columns_logged: print( f"| {'Date':<16} | {'Action':<10} | {'Pair':<10} | {'Trade Type':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'RSI':>5} | {'sma_pct_1d':>11} | {'last_max':>12} | {'rsi_pct':>12} | {'Buys':>5} | {'Stake':>10} |" ) print( f"|{'-' * 18}|{'-' * 12}|{'-' * 12}|{'-' * 20}|{'-' * 14}|{'-' * 8}|{'-' * 10}|{'-' * 7}|{'-' * 13}|{'-' * 14}|{'-' * 14}|{'-' * 7}|{'-' * 12}|" ) self.columns_logged = True date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' if last_candle is not None: if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' if last_candle['sma5_pct_1d'] is not None: sma5_1d = round(last_candle['sma5_pct_1d'] * 100, 2) if last_candle['sma5_pct_1h'] is not None: sma5_1h = round(last_candle['sma5_pct_1h'] * 100, 2) sma5 = str(sma5_1d) + ' ' + str(sma5_1h) first_rate = self.pairs[pair]['last_max'] if action != 'Sell': profit = round((last_candle['close'] - self.pairs[pair]['last_max']) / self.pairs[pair]['last_max'], 2) limit_sell = rsi_pct # round((last_candle['close'] - self.pairs[pair]['last_max']) / self.pairs[pair]['last_max'], 4) print( f"| {date:<16} | {action:<10} | {pair:<10} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {rsi or '-':>5} | {sma5 or '-':>11} | {first_rate or '-':>12} | {limit_sell or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |" ) @property def protections(self): return [ { "method": "CooldownPeriod", "stop_duration_candles": 12 } # { # "method": "MaxDrawdown", # "lookback_period_candles": self.lookback.value, # "trade_limit": self.trade_limit.value, # "stop_duration_candles": self.protection_stop.value, # "max_allowed_drawdown": self.protection_max_allowed_dd.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": self.protection_stoploss_stop.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] # def custom_backtest_summary(self, results: dict): # # Récupérer les stats existantes # total_trades = results.get("total_trades", 0) # profit_total = results.get("profit_total", 0) # wins = results.get("wins", 0) # losses = results.get("losses", 0) # days = results.get("backtest_days", 1) # # # Calculer de nouvelles métriques # win_loss_ratio = wins / losses if losses > 0 else float('inf') # avg_daily_profit = profit_total / days # trades_per_day = total_trades / days # # # Ajouter les nouvelles stats aux logs # logger.info("━━━━━━ METRIQUES PERSO ━━━━━━") # logger.info(f"Win/Loss Ratio: {win_loss_ratio:.2f}") # logger.info(f"Avg Daily Profit USDT: {avg_daily_profit:.2f}") # logger.info(f"Trades Per Day: {trades_per_day:.2f}") # logger.info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")