# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- import inspect import logging import os from datetime import datetime from datetime import timezone from datetime import timedelta from typing import Optional import freqtrade.vendor.qtpylib.indicators as qtpylib # Machine Learning import joblib import matplotlib.pyplot as plt import mpmath as mp import numpy as np import pandas as pd import seaborn as sns import shap # Add your lib to import here test git import ta import talib.abstract as talib from freqtrade.persistence import Trade from freqtrade.strategy import (CategoricalParameter, DecimalParameter, IntParameter, IStrategy, merge_informative_pair) import optuna from optuna.visualization import plot_optimization_history from optuna.visualization import plot_parallel_coordinate from optuna.visualization import plot_param_importances from optuna.visualization import plot_slice from pandas import DataFrame from sklearn.calibration import CalibratedClassifierCV from sklearn.feature_selection import SelectFromModel from sklearn.feature_selection import VarianceThreshold from sklearn.inspection import PartialDependenceDisplay from sklearn.inspection import permutation_importance from sklearn.linear_model import LogisticRegression from sklearn.metrics import brier_score_loss, roc_auc_score from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_curve, precision_score, recall_score ) from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.tree import export_text from xgboost import XGBClassifier import lightgbm as lgb import numpy as np import pandas as pd import optuna from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score from sklearn.model_selection import train_test_split from imblearn.over_sampling import SMOTE from sklearn.ensemble import RandomForestClassifier from lightgbm import LGBMClassifier # -------------------------------- logger = logging.getLogger(__name__) # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" class FrictradeLearning(IStrategy): startup_candle_count = 360 train_model = None model_indicators = [] DEFAULT_PARAMS = { "rsi_buy": 30, "rsi_sell": 70, "ema_period": 21, "sma_short": 20, "sma_long": 100, "atr_period": 14, "atr_multiplier": 1.5, "stake_amount": None, # use exchange default "stoploss": -0.10, "minimal_roi": {"0": 0.10} } indicators = {'sma24_deriv1', 'sma60_deriv1', 'sma5_deriv1_1h', 'sma12_deriv1_1h', 'sma24_deriv1_1h', 'sma60_deriv1_1h'} indic_1h_force_buy = CategoricalParameter(indicators, default="sma60_deriv1", space='buy') allow_decrease_rate = DecimalParameter(0.1, 0.8, decimals=1, default=0.4, space='protection', optimize=False, load=True) first_adjust_param = DecimalParameter(0.001, 0.01, decimals=3, default=0.005, space='protection', optimize=False, load=False) max_steps = IntParameter(10, 50, default=40, space='protection', optimize=True, load=True) hours_force = IntParameter(1, 48, default=24, space='buy', optimize=True, load=True) offset_min = IntParameter(1, 48, default=24, space='sell', optimize=True, load=True) offset_max = IntParameter(1, 48, default=24, space='sell', optimize=True, load=True) # ROI table: minimal_roi = { "0": 10 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = False trailing_stop = False trailing_stop_positive = 0.25 trailing_stop_positive_offset = 1 trailing_only_offset_is_reached = True # Buy hypers timeframe = '1m' parameters = {} # DCA config position_adjustment_enable = True columns_logged = False pairs = { pair: { "first_price": 0, "last_price": 0.0, 'min_buy_price': 999999999999999.5, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'first_amount': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False, 'last_ath': 0, 'mises': {}, 'dca_thresholds': {} } for pair in ["BTC/USDC", "BTC/USDT", "BTC/USDT:USDT"] } trades = list() max_profit_pairs = {} btc_ath_history = [ {"date": "2011-06-09", "price_usd": 26.15, "note": "pic 2011 (early breakout)"}, {"date": "2013-11-29", "price_usd": 1132.00, "note": "bull run fin 2013"}, {"date": "2017-12-17", "price_usd": 19783.00, "note": "ATH décembre 2017 (crypto bubble)"}, {"date": "2020-12-31", "price_usd": 29001.72, "note": "fin 2020, nouveau record après accumulation)"}, {"date": "2021-11-10", "price_usd": 68742.00, "note": "record novembre 2021 (institutional demand)"}, {"date": "2024-03-05", "price_usd": 69000.00, "note": "nouveau pic début 2024 (source presse, valeur indicative)"}, {"date": "2024-03-14", "price_usd": 73816.00, "note": "nouveau pic début 2024 (source presse, valeur indicative)"}, {"date": "2024-11-12", "price_usd": 90000.00, "note": ""}, {"date": "2024-12-17", "price_usd": 108363.00, "note": ""}, {"date": "2025-07-11", "price_usd": 118755.00, "note": "pic juillet 2025 (valeur rapportée par la presse)"}, {"date": "2025-08-13", "price_usd": 123748.00, "note": ""}, {"date": "2025-10-06", "price_usd": 126198.07, "note": "pic oct. 2025 (source agrégée, à vérifier selon l'exchange)"} ] def dynamic_trailing_offset(self, pair, stake, last_candle, price, ath, count_of_buys, max_dca=5): # dd_ath = (ath - price) / ath # dd_ath = max(0.0, min(dd_ath, 0.5)) # # dca_risk = min(count_of_buys / max_dca, 1.0) # # breathing_score = 0.7 * dd_ath + 0.3 * (1 - dca_risk) # breathing_score = min(max(breathing_score, 0.0), 1.0) # # OFFSET_MIN = self.offset_min.value # OFFSET_MAX = self.offset_min.value + self.offset_max.value # if self.pairs[pair]['has_gain'] > 0: # return 0 # if self.pairs[pair]['has_gain']: # stake = (stake - self.pairs[pair]['first_amount']) if last_candle['sma180_deriv1'] < 0.005: return stake / 200 return stake / 100 # OFFSET_MIN + breathing_score * (OFFSET_MAX - OFFSET_MIN) def cooldown_from_heat(self, score): if score < 0.05: return timedelta(minutes=0) elif score < 0.25: return timedelta(minutes=30) elif score < 0.5: return timedelta(hours=2) else: return timedelta(hours=4) def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True # (last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) allow_to_buy = True # (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') cooldown = self.cooldown_from_heat(last_candle['heat_score']) if self.pairs[pair]['last_date'] != 0 and cooldown.total_seconds() > 0: if current_time < self.pairs[pair]['last_date'] + cooldown: allow_to_buy = False if allow_to_buy: self.trades = list() self.pairs[pair]['first_price'] = rate self.pairs[pair]['last_price'] = rate self.pairs[pair]['min_buy_price'] = min(rate, self.pairs[pair]['min_buy_price']) self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['min_buy_price'] = rate dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.pairs[pair]['first_amount'] = stake_amount self.calculateStepsDcaThresholds(last_candle, pair) self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) # else: # self.printLog( # f"{current_time} BUY triggered for {pair} (cooldown={cooldown} minutes={minutes} percent={round(last_candle['hapercent'], 4)}) but condition blocked") return allow_to_buy def progressive_parts(self, total, n, first): # print('In part') # conditions impossibles → on évite le solveur if total <= 0 or first <= 0 or n <= 1: return [0] * n f = lambda r: first * (r ** n - 1) / (r - 1) - total try: r = mp.findroot(f, 1.2) # 1.2 = plus stable que 1.05 except Exception: # fallback en cas d'échec return [first] * n parts = [round(first * (r ** k), 4) for k in range(n)] return parts def calculateStepsDcaThresholds(self, last_candle, pair): # def split_ratio_one_third(n, p): # a = n / (2 * p) # première valeur # d = n / (p * (p - 1)) # incrément # return [round(a + i * d, 3) for i in range(p)] # r, parts = progressive_parts(0.4, 40, 0.004) # print("r =", r) # print(parts) val = self.pairs[pair]['first_price'] if self.pairs[pair]['first_price'] > 0 else last_candle['mid'] if self.pairs[pair]['last_ath'] == 0: ath = max(val, self.get_last_ath_before_candle(last_candle)) self.pairs[pair]['last_ath'] = ath ath = self.pairs[pair]['last_ath'] steps = self.calculateNumberOfSteps(val, ath, max_steps=self.max_steps.value) self.pairs[pair]['dca_thresholds'] = self.progressive_parts( (val - (ath * (1 - self.allow_decrease_rate.value))) / val, steps, self.first_adjust_param.value) print(f"val={val} lim={self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate.value)}" f" steps={steps}" f" pct={round((val - (self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate.value))) / val, 4)}") print(self.pairs[pair]['dca_thresholds']) def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0)) profit = trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] # and (last_candle['hapercent'] < 0 ) allow_to_sell = True # (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['previous_profit'] = 0 self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['first_amount'] = 0 self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_price'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None self.pairs[pair]['min_buy_price'] = 100000000000000 self.pairs[pair]['dca_thresholds'] = {} self.pairs[pair]['mises'] = {} else: self.printLog( f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') # def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): # # dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) # last_candle = dataframe.iloc[-1].squeeze() # last_candle_1h = dataframe.iloc[-13].squeeze() # before_last_candle = dataframe.iloc[-2].squeeze() # before_last_candle_2 = dataframe.iloc[-3].squeeze() # before_last_candle_12 = dataframe.iloc[-13].squeeze() # # expected_profit = self.expectedProfit(pair, last_candle) # # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") # # max_touch_before = self.pairs[pair]['max_touch'] # self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) # self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) # self.pairs[pair]['current_trade'] = trade # # count_of_buys = trade.nr_of_successful_entries # # profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) # self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) # max_profit = last_candle['max5'] #self.pairs[pair]['max_profit'] # baisse = 0 # if profit > 0: # baisse = 1 - (profit / max_profit) # mx = max_profit / 5 # self.pairs[pair]['count_of_buys'] = count_of_buys # self.pairs[pair]['current_profit'] = profit # # dispo = round(self.wallets.get_available_stake_amount()) # hours_since_first_price = (current_time - trade.open_date_utc).seconds / 3600.0 # days_since_first_price = (current_time - trade.open_date_utc).days # hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 # minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 # # if minutes % 4 == 0: # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else " # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=round(profit, 2), # buys=count_of_buys, # stake=0 # ) # # if (last_candle['close'] > last_candle['mid']) or (last_candle['sma5_deriv1'] > 0): # return None # # pair_name = self.getShortName(pair) # # if profit > 0.003 * count_of_buys and baisse > 0.30: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # # self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '') def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_price']) / self.pairs[pair]['first_price'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_price']) / self.pairs[pair]['last_price'], 4) def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if (self.getShortName(pair) == 'BTC'): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] expected_profit = lim * self.pairs[pair][ 'total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit return expected_profit def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_min':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|{'rsi_1h':>6}|{'rsi_1d':>6}|{'cf_1h':>6}|{'cf_1d':>6}" # |Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy', 'mises', 'dca_thresholds'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_price", "last_max", "max_touch", "last_sell","last_price", 'count_of_buys', 'current_profit'] self.printLog(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None rsi = '' rsi_pct = '' sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = '' last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) color = GREEN if profit > 0 else RED profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" f"{round(last_candle['max_rsi_24'], 1) or '-':>6}|{round(last_candle['rsi_1h'], 1) or '-':>6}|{round(last_candle['rsi_1d'], 1) or '-':>6}|" # f"{round(last_candle['rtp_1h'] * 100, 0) or '-' :>6}|{round(last_candle['rtp_1d'] * 100, 0) or '-' :>6}|" # f"{round(last_candle['confidence_index_1d'], 3) or '-':>6}|{round(last_candle['confidence_index_1h'], 3) or '-':>6}|" ) def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] short_pair = self.getShortName(pair) self.path = f"user_data/strategies/plots/{short_pair}/" # + ("valide/" if not self.dp.runmode.value in ('backtest') else '') # dataframe['open'] = dataframe['open'] / dataframe['open'].rolling(180).mean() # dataframe['close'] = dataframe['close'] / dataframe['close'].rolling(180).mean() # dataframe['low'] = dataframe['low'] / dataframe['low'].rolling(180).mean() # dataframe['high'] = dataframe['high'] / dataframe['high'].rolling(180).mean() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2 dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() # dataframe["mid"].rolling(window=5).mean() dataframe['sma5_deriv1'] = 1000 * (dataframe['sma5'] - dataframe['sma5'].shift(1)) / dataframe['sma5'].shift(1) dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() dataframe['sma12_deriv1'] = 1000 * (dataframe['sma12'] - dataframe['sma12'].shift(1)) / dataframe[ 'sma12'].shift(1) dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() dataframe['sma24_deriv1'] = 1000 * (dataframe['sma24'] - dataframe['sma24'].shift(1)) / dataframe[ 'sma24'].shift(1) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() dataframe['sma60_deriv1'] = 1000 * (dataframe['sma60'] - dataframe['sma60'].shift(1)) / dataframe[ 'sma60'].shift(1) # dataframe[f"sma5_inv"] = (dataframe[f"sma5"].shift(2) >= dataframe[f"sma5"].shift(1)) \ # & (dataframe[f"sma5"].shift(1) <= dataframe[f"sma5"]) dataframe["sma5_sqrt"] = ( np.sqrt(np.abs(dataframe["sma5"] - dataframe["sma5"].shift(1))) + np.sqrt(np.abs(dataframe["sma5"].shift(3) - dataframe["sma5"].shift(1))) ) dataframe["sma5_inv"] = ( (dataframe["sma5"].shift(2) >= dataframe["sma5"].shift(1)) & (dataframe["sma5"].shift(1) <= dataframe["sma5"]) & (dataframe["sma5_sqrt"] > 5) ) dataframe["sma12_sqrt"] = ( np.sqrt(np.abs(dataframe["sma12"] - dataframe["sma12"].shift(1))) + np.sqrt(np.abs(dataframe["sma12"].shift(3) - dataframe["sma12"].shift(1))) ) dataframe["sma12_inv"] = ( (dataframe["sma12"].shift(2) >= dataframe["sma12"].shift(1)) & (dataframe["sma12"].shift(1) <= dataframe["sma12"]) & (dataframe["sma12_sqrt"] > 5) ) dataframe["percent"] = dataframe['mid'].pct_change() dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean() dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean() dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean() dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14) self.calculeDerivees(dataframe, 'rsi', ema_period=12) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5) dataframe['min180'] = talib.MIN(dataframe['mid'], timeperiod=180) dataframe['max180'] = talib.MAX(dataframe['mid'], timeperiod=180) # dataframe['pct180'] = ((dataframe["mid"] - dataframe['min180']) / (dataframe['max180'] - dataframe['min180'])) dataframe = self.rsi_trend_probability(dataframe, short=60, long=360) # ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h') informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 # Calcul MACD macd, macdsignal, macdhist = talib.MACD( informative['close'], fastperiod=12, slowperiod=26, signalperiod=9 ) informative['macd'] = macd informative['macdsignal'] = macdsignal informative['macdhist'] = macdhist informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14) for timeperiod in [5, 12, 24, 60]: informative[f'sma{timeperiod}'] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14) self.calculeDerivees(informative, 'rsi', ema_period=12) self.calculateScores(informative, 6) # informative = self.rsi_trend_probability(informative) # self.calculateConfiance(informative) # informative = self.populate1hIndicators(df=informative, metadata=metadata) # informative = self.calculateRegression(informative, 'mid', lookback=15) ########################################################### # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(informative), window=20, stds=2) informative['bb_lowerband'] = bollinger['lower'] informative['bb_middleband'] = bollinger['mid'] informative['bb_upperband'] = bollinger['upper'] informative["bb_percent"] = ( (informative["close"] - informative["bb_lowerband"]) / (informative["bb_upperband"] - informative["bb_lowerband"]) ) informative["bb_width"] = (informative["bb_upperband"] - informative["bb_lowerband"]) / informative["bb_middleband"] # Calcul MACD macd, macdsignal, macdhist = talib.MACD(informative['close'], fastperiod=12, slowperiod=26, signalperiod=9) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le informative informative['macd'] = macd informative['macdsignal'] = macdsignal informative['macdhist'] = macdhist informative["volume_mean"] = informative["volume"].rolling(20).mean() informative["volume_ratio"] = informative["volume"] / informative["volume_mean"] informative['volume2'] = informative['volume'] informative.loc[informative['close'].pct_change() < 0, 'volume2'] *= -1 informative['volume_spike'] = (abs(informative['volume2']) > abs(informative['volume2'].rolling(window=20).mean() * 5)) \ & (informative['volume'].rolling(window=5).max() > 1000) # --- Volatilité normalisée --- informative['atr'] = ta.volatility.AverageTrueRange(high=informative['high'], low=informative['low'], close=informative['close'], window=14).average_true_range() informative['atr_norm'] = informative['atr'] / informative['close'] # --- Force de tendance --- informative['adx'] = ta.trend.ADXIndicator(high=informative['high'], low=informative['low'], close=informative['close'], window=14).adx() # --- Volume directionnel (On Balance Volume) --- informative['obv'] = ta.volume.OnBalanceVolumeIndicator(close=informative['close'], volume=informative['volume']).on_balance_volume() self.calculeDerivees(informative, 'obv', ema_period=1) informative['obv12'] = ta.volume.OnBalanceVolumeIndicator(close=informative['sma12'], volume=informative['volume'].rolling(12).sum()).on_balance_volume() informative['obv24'] = ta.volume.OnBalanceVolumeIndicator(close=informative['sma24'], volume=informative['volume'].rolling(24).sum()).on_balance_volume() informative['rsi_slope'] = informative['rsi'].diff(3) / 3 # vitesse moyenne du RSI informative['adx_change'] = informative['adx'] - informative['adx'].shift(12) # évolution de la tendance informative['volatility_ratio'] = informative['atr_norm'] / informative['bb_width'] # informative["slope_ratio"] = informative["sma5_deriv1"] / (informative["sma60_deriv1"] + 1e-9) # informative["divergence"] = (informative["rsi_deriv1"] * informative["sma5_deriv1"]) < 0 dataframe = merge_informative_pair(dataframe, informative, '1m', '1h', ffill=True) # ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1d') informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 informative['rsi'] = talib.RSI(informative['mid'], timeperiod=5) informative['min30'] = talib.MIN(informative['mid'], timeperiod=30) informative['max30'] = talib.MAX(informative['mid'], timeperiod=30) # informative = self.rsi_trend_probability(informative) # informative = self.calculateRegression(informative, 'mid', lookback=15) # self.calculateConfiance(informative) ########################################################### # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(informative), window=20, stds=2) informative['bb_lowerband'] = bollinger['lower'] informative['bb_middleband'] = bollinger['mid'] informative['bb_upperband'] = bollinger['upper'] informative["bb_percent"] = ( (informative["close"] - informative["bb_lowerband"]) / (informative["bb_upperband"] - informative["bb_lowerband"]) ) # informative["bb_width"] = (informative["bb_upperband"] - informative["bb_lowerband"]) / informative["bb_middleband"] # # Calcul MACD # macd, macdsignal, macdhist = talib.MACD( # informative['close'], # fastperiod=12, # slowperiod=26, # signalperiod=9 # ) # # # | Nom | Formule / définition | Signification | # # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # # # Ajouter dans le informative # informative['macd'] = macd # informative['macdsignal'] = macdsignal # informative['macdhist'] = macdhist informative["volume_mean"] = informative["volume"].rolling(20).mean() informative["volume_ratio"] = informative["volume"] / informative["volume_mean"] informative['volume2'] = informative['volume'] informative.loc[informative['close'].pct_change() < 0, 'volume2'] *= -1 informative['volume_spike'] = (abs(informative['volume2']) > abs(informative['volume2'].rolling(window=20).mean() * 5)) \ & (informative['volume'].rolling(window=5).max() > 1000) for timeperiod in [3, 5, 8, 12]: informative[f'sma{timeperiod}'] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14) self.calculeDerivees(informative, 'rsi', ema_period=12) self.calculateScores(informative, 6) dataframe = merge_informative_pair(dataframe, informative, '1m', '1d', ffill=True) dataframe["pct30"] = dataframe["close"].pct_change(30) dataframe["pct60"] = dataframe["close"].pct_change(60) dataframe["pct120"] = dataframe["close"].pct_change(120) dataframe["pct180"] = dataframe["close"].pct_change(180) dataframe["pct300"] = dataframe["close"].pct_change(300) dataframe["pct600"] = dataframe["close"].pct_change(600) dataframe["pct1200"] = dataframe["close"].pct_change(1200) dataframe["sma_ratio"] = dataframe["sma5_1h"] / dataframe["sma60"] dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 min_price = 111111111111110 max_price = 0 for buy in filled_buys: if count == 0: min_price = min(min_price, buy.price) max_price = max(max_price, buy.price) dataframe['first_price'] = buy.price self.pairs[pair]['first_price'] = buy.price self.pairs[pair]['first_amount'] = buy.price * buy.filled # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_price'] = buy.price self.pairs[pair]['min_buy_price'] = min(buy.price, self.pairs[pair]['min_buy_price']) count = count + 1 amount += buy.price * buy.filled self.pairs[pair]['count_of_buys'] = count self.pairs[pair]['total_amount'] = amount dataframe['absolute_min'] = dataframe['mid'].rolling(1440, min_periods=1).min() dataframe['absolute_max'] = dataframe['mid'].rolling(1440, min_periods=1).max() # steps = (dataframe['absolute_max'] - dataframe['absolute_min']) / (dataframe['absolute_min'] * 0.01) # levels = [dataframe['absolute_min'] * (1 + i / 100) for i in range(1, steps + 1)] # # print(levels) for timeperiod in [5, 12, 24, 60]: dataframe[f'sma{timeperiod}_1h'] = dataframe[f'sma{timeperiod}_1h'].rolling(window=60).mean() self.calculeDerivees(dataframe, f'sma{timeperiod}_1h', ema_period=12) ########################################################### # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"] # Calcul MACD macd, macdsignal, macdhist = talib.MACD( dataframe['close'], fastperiod=12, slowperiod=26, signalperiod=9 ) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le dataframe dataframe['macd'] = macd dataframe['macdsignal'] = macdsignal dataframe['macdhist'] = macdhist # Regarde dans le futur # # --- Rendre relatif sur chaque série (-1 → 1) --- # for col in ['macd', 'macdsignal', 'macdhist']: # series = dataframe[col] # valid = series[~np.isnan(series)] # ignorer NaN # min_val = valid.min() # max_val = valid.max() # span = max_val - min_val if max_val != min_val else 1 # dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1 # # dataframe['tdc_macd'] = self.macd_tendance_int( # dataframe, # macd_col='macd_rel', # signal_col='macdsignal_rel', # hist_col='macdhist_rel' # ) # ------------------------------------------------------------------------------------ # rolling SMA indicators (used for trend detection too) s_short = self.DEFAULT_PARAMS['sma_short'] s_long = self.DEFAULT_PARAMS['sma_long'] dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean() dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean() # # --- pente brute --- # dataframe['slope'] = dataframe['sma24'].diff() # # # --- lissage EMA --- # dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean() # # RSI # window = 14 # delta = dataframe['close'].diff() # up = delta.clip(lower=0) # down = -1 * delta.clip(upper=0) # ma_up = up.rolling(window=window).mean() # ma_down = down.rolling(window=window).mean() # rs = ma_up / ma_down.replace(0, 1e-9) # dataframe['rsi'] = 100 - (100 / (1 + rs)) # # # EMA example # dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean() # # # ATR (simple implementation) # high_low = dataframe['high'] - dataframe['low'] # high_close = (dataframe['high'] - dataframe['close'].shift()).abs() # low_close = (dataframe['low'] - dataframe['close'].shift()).abs() # tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1) # dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean() ########################### # df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume # Assure-toi qu'il est trié par date croissante timeframe = self.timeframe # --- Volatilité normalisée --- dataframe['atr'] = ta.volatility.AverageTrueRange(high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14).average_true_range() dataframe['atr_norm'] = dataframe['atr'] / dataframe['close'] # --- Force de tendance --- dataframe['adx'] = ta.trend.ADXIndicator(high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14).adx() # --- Volume directionnel (On Balance Volume) --- dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(close=dataframe['close'], volume=dataframe['volume']).on_balance_volume() self.calculeDerivees(dataframe, 'obv', ema_period=1) dataframe['obv12'] = ta.volume.OnBalanceVolumeIndicator(close=dataframe['sma12'], volume=dataframe['volume'].rolling(12).sum()).on_balance_volume() dataframe['obv24'] = ta.volume.OnBalanceVolumeIndicator(close=dataframe['sma24'], volume=dataframe['volume'].rolling(24).sum()).on_balance_volume() dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width'] dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9) dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0 ########################################################### # print(f"min={dataframe['absolute_min'].min()} max={dataframe['absolute_max'].max()}") for i in [0, 1, 2, 3]: dataframe[f"lvl_{i}_pct"] = dataframe['absolute_min'] * (1 + 0.01 * i) self.model_indicators = self.listUsableColumns(dataframe) print("INDICATORS : ", self.model_indicators ) if False and self.dp.runmode.value in ('backtest'): self.trainModel3(dataframe, metadata) short_pair = self.getShortName(pair) path=f"user_data/strategies/plots/{short_pair}/" data = joblib.load(f"{self.path}/{short_pair}_rf_model.pkl") self.model = data["model"] self.model_indicators = data["features"] # Préparer les features pour la prédiction X_Valid = dataframe[self.model_indicators].fillna(0) # Prédiction : probabilité que le prix monte # # Affichage des colonnes intérressantes dans le model # features_pruned, kept_features = self.prune_features( # model=self.model, # dataframe=dataframe, # feature_columns=self.model_indicators, # importance_threshold=0.005 # enlever features < % importance # ) # probs = self.model.predict_proba(features)[:, 1] probs_all_classes = self.model.predict(X_Valid) # shape = (n_samples, n_classes) print(probs_all_classes.shape) # doit être (n_samples, 3) # Ajouter probabilité de chaque classe au dataframe pour analyse for i in range(3): dataframe[f'prob_class_{i}'] = probs_all_classes[:, i] # Pour la probabilité de la classe 2 : probs = probs_all_classes[:, 2] # Sauvegarder la probabilité pour l’analyse dataframe['ml_prob'] = probs if False and self.dp.runmode.value in ('backtest'): self.inspect_model(self.model) # # absolute_min = dataframe['absolute_min'].min() # absolute_max = dataframe['absolute_max'].max() # # # Écart total # diff = absolute_max - absolute_min # # # Nombre de lignes intermédiaires (1% steps) # steps = int((absolute_max - absolute_min) / (absolute_min * 0.01)) # # # Niveaux de prix à 1%, 2%, ..., steps% # levels = [absolute_min * (1 + i / 100) for i in range(1, steps + 1)] # levels = [lvl for lvl in levels if lvl < absolute_max] # évite le dernier niveau exact # # # ajout dans le DataFrame # for i, lvl in enumerate(levels, start=1): # dataframe[f"lvl_{i}_pct"] = lvl # # Indices correspondants # indices = [(dataframe['mid'] - lvl).abs().idxmin() for lvl in levels] # Non utilisé dans le modèle dataframe['min60'] = talib.MIN(dataframe['mid'], timeperiod=60) self.calculeDerivees(dataframe, 'sma12', ema_period=6) self.calculeDerivees(dataframe, 'sma5', ema_period=3) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() self.calculeDerivees(dataframe, 'sma60', ema_period=20) dataframe['sma180'] = dataframe['mid'].ewm(span=180, adjust=False).mean() self.calculeDerivees(dataframe, 'sma180', ema_period=60) horizon = 180 self.calculateScores(dataframe, horizon) dataframe['cross_sma60'] = qtpylib.crossed_below(dataframe["sma12"], dataframe['sma60']) # val = 90000 # steps = 12 # [0.018, 0.022, 0.025, 0.028, 0.032, 0.035, 0.038, 0.042, 0.045, 0.048, 0.052, 0.055] # val = 100000 # steps = 20 # [0.012, 0.014, 0.015, 0.016, 0.018, 0.019, 0.02, 0.022, 0.023, 0.024, 0.025, 0.027, 0.028, 0.029, 0.031, 0.032, # 0.033, 0.035, 0.036, 0.037] # val = 110000 # steps = 28 # [0.01, 0.01, 0.011, 0.012, 0.013, 0.013, 0.014, 0.015, 0.015, 0.016, 0.017, 0.018, 0.018, 0.019, 0.02, 0.02, # 0.021, 0.022, 0.023, 0.023, 0.024, 0.025, 0.025, 0.026, 0.027, 0.028, 0.028, 0.029] # val = 120000 # steps = 35 # [0.008, 0.009, 0.009, 0.01, 0.01, 0.011, 0.011, 0.012, 0.012, 0.013, 0.013, 0.014, 0.014, 0.015, 0.015, 0.016, # 0.016, 0.017, 0.017, 0.018, 0.018, 0.019, 0.019, 0.019, 0.02, 0.02, 0.021, 0.021, 0.022, 0.022, 0.023, 0.023, # 0.024, 0.024, 0.025] # def split_ratio_one_third(n, p): # a = n / (2 * p) # première valeur # d = n / (p * (p - 1)) # incrément # return [round(a + i * d, 3) for i in range(p)] # allow_decrease_rate = 0.3 # for val in range(70000, 140000, 10000): # ath = 126000 # # steps = self.calculateNumberOfSteps(val, ath, max_steps=40) # self.printLog(f"allow_decrease_rate={self.allow_decrease_rate.value} val={val} steps={steps} pct={round((val - (ath * (1 - allow_decrease_rate))) / val, 4)}") # # dca = split_ratio_one_third((val - (ath * (1 - self.allow_decrease_rate.value))) / ath, steps) # # self.printLog(dca) # dca_thresholds = self.progressive_parts( # (val - (ath * (1 - self.allow_decrease_rate.value))) / val, # steps, self.first_adjust_param.value) # print(f"val={val} lim={ath * (1 - self.allow_decrease_rate.value)}" # f"steps={steps} " # f"pct={(round(val - (ath * (1 - self.allow_decrease_rate.value))) / val, 4)}") # print(dca_thresholds) ath = 126000 last_candle = dataframe.iloc[-1].squeeze() val = last_candle['first_price'] # steps = self.calculateNumberOfSteps(val, ath, max_steps=40) # self.printLog( # f"allow_decrease_rate={self.allow_decrease_rate.value} val={val} steps={steps} pct={round((val - (ath * (1 - allow_decrease_rate))) / val, 4)}") # dca_thresholds = self.progressive_parts((val - (ath * (1 - self.allow_decrease_rate.value))) / val, steps, self.first_adjust_param.value) # print(f"val={val} lim={ath * (1 - self.allow_decrease_rate.value)}" # f"steps={steps} " # f"pct={(round(val - (ath * (1 - self.allow_decrease_rate.value))) / val, 4)}") # print(dca_thresholds) if self.pairs[pair]['last_ath'] == 0: ath = max(val, self.get_last_ath_before_candle(last_candle)) self.pairs[pair]['last_ath'] = ath if len(self.pairs[pair]['dca_thresholds']) == 0: self.calculateStepsDcaThresholds(last_candle, pair) if self.pairs[pair]['count_of_buys']: dca_threshold = self.pairs[pair]['dca_thresholds'][min(self.pairs[pair]['count_of_buys'] - 1, len(self.pairs[pair]['dca_thresholds']) - 1)] dataframe[f"next_dca"] = val * (1 - dca_threshold) print(f"count_of_buys={self.pairs[pair]['count_of_buys']} dca_threshold={dca_threshold} {self.pairs[pair]['dca_thresholds']}") print(f"val={val} dca={self.pairs[pair]['dca_thresholds']} ath={self.pairs[pair]['last_ath']} first_price={self.pairs[pair]['first_price']}") if self.dp and val > 0: if self.dp.runmode.value in ('live', 'dry_run'): if len(self.pairs[pair]['mises']) == 0: full, mises, steps = self.calculateMises(pair, self.pairs[pair]['last_ath'], val) else: mises = self.pairs[pair]['mises'] steps = len(self.pairs[pair]['mises']) # stake = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle)) if val and len(self.pairs[pair]['dca_thresholds']) > 0 and len(mises) > 0 : print(self.pairs[pair]['dca_thresholds']) count = 0 pct = 0 dataframe = dataframe.copy() total_stake = 1 loss_amount = 0 dca_previous = 0 for dca in self.pairs[pair]['dca_thresholds']: stake = mises[count] total_stake += stake pct += dca loss_amount += total_stake * dca_previous offset = self.dynamic_trailing_offset(pair, total_stake, last_candle, price=val, ath=ath, count_of_buys=count) if count == self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] - 1: print(f"next_buy={round(val * (1 - pct),1)} count={count} pct={round(pct, 4)}") dataframe[f"next_buy"] = val * (1 - pct) count += 1 print( f"stake={round(stake, 1)} total_stake={round(total_stake, 1)} count={count} " f"pct={round(pct, 4)} offset={round(offset, 1)} next_buy={round(val * (1 - pct), 2)} " f"loss_amount={round(loss_amount, 2)} pct_average={round(loss_amount / total_stake, 3)}") dca_previous = dca return dataframe def calculateScores(self, dataframe, horizon): dataframe['price_change'] = (dataframe['close'] - dataframe['close'].shift(horizon)) / dataframe['close'].shift(horizon) # dataframe['rsi_delta'] = dataframe['rsi'] - dataframe['rsi'].shift(horizon) dataframe['price_score'] = (dataframe['price_change'] / 0.05).clip(0, 2) # dataframe['rsi_score'] = (dataframe['rsi_delta'] / 15).clip(0, 2) dataframe['heat_score'] = talib.MAX(dataframe['price_score'], timeperiod=horizon) # + dataframe['rsi_score'] def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades # def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # # (dataframe['sma5_inv'] == 1) # ( # (dataframe['pct180'] < 0.5) | # ( # (dataframe['close'] < dataframe['sma60'] ) # & (dataframe['sma24_deriv1'] > 0) # ) # ) # # & (dataframe['hapercent'] > 0) # # & (dataframe['sma24_deriv1'] > - 0.03) # & (dataframe['ml_prob'] > 0.1) # # & ( # # (dataframe['percent3'] <= -0.003) # # | (dataframe['percent12'] <= -0.003) # # | (dataframe['percent24'] <= -0.003) # # ) # ), ['enter_long', 'enter_tag']] = (1, f"future") # # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan) # # return dataframe def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ Buy when the model predicts a high upside probability/value. This method loads the ML model, generates predictions, and triggers a buy if the predicted value exceeds a learned threshold. """ # # Ensure prediction column exists # if "ml_prediction" not in dataframe.columns: # # Generate predictions on the fly # # (your model must already be loaded in self.model) # features = self.ml_features # list of feature column names # dataframe["ml_prediction"] = self.model.predict(dataframe[features].fillna(0)) # Choose threshold automatically based on training statistics # or a fixed value discovered by SHAP / PDP # threshold = 0.4 #self.buy_threshold # ex: 0.80 or 1.10 depending on your model # 20% des signaux les plus forts # threshold = np.percentile(dataframe["ml_prob"], 80) # Buy = prediction > threshold dataframe["buy"] = 0 # dataframe.loc[ # # (dataframe["ml_prob"].shift(1) < dataframe["ml_prob"]) # (dataframe['sma60_deriv1'] > -0.0000) # & (dataframe['sma12_deriv1'] > 0) # & (dataframe['sma12'] < dataframe['sma60']) # # & (dataframe['rsi'] < 77) # # & (dataframe['heat_score_1h'] < 0.5) # # & (dataframe['sma180_deriv1'] > 0) # # & (dataframe['open'] < dataframe['max180'] * 0.997) # # & (dataframe['min180'].shift(3) == dataframe['min180']) # , ['enter_long', 'enter_tag'] # ] = (1, f"future") score = ( (dataframe['max_rsi_12'] > 70).astype(int) * 3 + (dataframe['pct30'] < 0).astype(int) * 2 + (dataframe['percent12'] < 0).astype(int) * 2 + (dataframe['rsi_dist'] < 0).astype(int) * 1 ) dataframe.loc[score >= 5, ['enter_long', 'enter_tag']] = (1, f"long") # dataframe.loc[ # # (dataframe["ml_prob"].shift(1) < dataframe["ml_prob"]) # ( # # 🔥 RSI récemment élevé (surachat) # (dataframe['max_rsi_12'] > 70) & # # # 📉 retournement en cours # (dataframe['rsi'] < dataframe['max_rsi_12'] - 10) & # # # 📉 perte de momentum court terme # (dataframe['pct30'] < 0) & # # # 📉 confirmation # (dataframe['percent12'] < 0) # ) # & (dataframe['hapercent'] > 0) # , ['enter_long', 'enter_tag'] # ] = (1, f"long") # dataframe.loc[ # # (dataframe["ml_prob"].shift(1) < dataframe["ml_prob"]) # ( # dataframe['prob_class_0'] > 0.45 # ) # & (dataframe['hapercent'] < 0) # , ['enter_short', 'enter_tag'] # ] = (1, f"short") score = ( (dataframe['pct30'] > 0.01).astype(int) * 3 + (dataframe['percent12'] > 0.005).astype(int) * 3 + (dataframe['rsi'] > 60).astype(int) * 2 + (dataframe['rsi'] < dataframe['rsi'].shift(1)).astype(int) * 1 ) dataframe.loc[score >= 5, ['enter_short', 'enter_tag']] = (1, f"short") dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) return dataframe # def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # """ # Populate buy signals based on SHAP/PDP insights: # - strong momentum: macdhist high and macd > macdsignal # - rsi elevated (but not extreme) # - positive sma24 derivative above threshold # - price above sma60 (trend context) # - price in upper region of Bollinger (bb_percent high) # - volume/obv filter and volatility guard (obv_dist, atr) # Returns dataframe with column 'buy' (1 = buy signal). # """ # # # Ensure column existence (fallback to zeros if missing) # cols = [ # "macdhist", "macd", "macdsignal", "rsi", "rsi_short", # "sma24_deriv1", "sma60", "bb_percent", # "obv_dist", "atr", "percent", "open_1h", "absolute_min" # ] # for c in cols: # if c not in dataframe.columns: # dataframe[c] = 0.0 # # # Thresholds (tune these) # TH_MACDHIST = 8.0 # macdhist considered "strong" (example) # TH_MACD_POS = 0.0 # macd must be > 0 (positive momentum) # TH_SMA24_DERIV = 0.05 # sma24 derivative threshold where effect appears # TH_RSI_LOW = 52.0 # lower bound to consider bullish RSI # TH_RSI_HIGH = 85.0 # upper bound to avoid extreme overbought (optional) # TH_BB_PERCENT = 0.7 # in upper band (0..1) # TH_OBV_DIST = -40.0 # accept small negative OBV distance, reject very negative # MAX_ATR = None # optional: maximum ATR to avoid extreme volatility (None = off) # MIN_PRICE_ABOVE_SMA60 = 0.0 # require price > sma60 (price - sma60 > 0) # # price = dataframe["close"] # # # Momentum conditions # cond_macdhist = dataframe["macdhist"] >= TH_MACDHIST # cond_macd_pos = dataframe["macd"] > TH_MACD_POS # cond_macd_vs_signal = dataframe["macd"] > dataframe["macdsignal"] # # # RSI condition (accept moderate-high RSI) # cond_rsi = (dataframe["rsi"] >= TH_RSI_LOW) & (dataframe["rsi"] <= TH_RSI_HIGH) # # # SMA24 derivative: require momentum above threshold # cond_sma24 = dataframe["sma24_deriv1"] >= TH_SMA24_DERIV # # # Price above SMA60 (trend filter) # cond_above_sma60 = (price - dataframe["sma60"]) > MIN_PRICE_ABOVE_SMA60 # # # Bollinger band percent (price in upper region) # cond_bb = dataframe["bb_percent"] >= TH_BB_PERCENT # # # Volume/OBV prudence filter # cond_obv = dataframe["obv_dist"] >= TH_OBV_DIST # # # Optional ATR guard # if MAX_ATR is not None: # cond_atr = dataframe["atr"] <= MAX_ATR # else: # cond_atr = np.ones_like(dataframe["atr"], dtype=bool) # # # Optional additional guards (avoid tiny percent moves or weird opens) # cond_percent = np.abs(dataframe["percent"]) > 0.0005 # ignore almost-no-move bars # cond_open = True # keep as placeholder; you can add open_1h relative checks # # # Combine into a buy signal # buy_condition = ( # cond_macdhist & # cond_macd_pos & # cond_macd_vs_signal & # cond_rsi & # cond_sma24 & # cond_above_sma60 & # cond_bb & # cond_obv & # cond_atr & # cond_percent # ) # # # Finalize: set buy column (0/1) # dataframe.loc[buy_condition, ['enter_long', 'enter_tag']] = (1, f"future") # # dataframe.loc[~buy_condition, "buy"] = 0 # # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan) # # return dataframe def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe # def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # # Calculer le minimum des 14 derniers jours # nb_pairs = len(self.dp.current_whitelist()) # # base_stake_amount = self.config.get('stake_amount') # # if True : #self.pairs[pair]['count_of_buys'] == 0: # factor = 1 #65 / min(65, last_candle['rsi_1d']) # # if last_candle['min_max_60'] > 0.04: # # factor = 2 # # adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor) # else: # adjusted_stake_amount = self.pairs[pair]['first_amount'] # # if self.pairs[pair]['count_of_buys'] == 0: # self.pairs[pair]['first_amount'] = adjusted_stake_amount # # return adjusted_stake_amount def calculateNumberOfSteps(self, current, ath, max_steps=0): if (max_steps == 0): max_steps = self.max_steps.value X_min = ath * (1 - self.allow_decrease_rate.value) # 126198 * 0.4 = 75718,8 Y_min = 1 Y_max = max_steps a = (Y_max - Y_min) / (ath - X_min) # 39 ÷ (126198 − 126198×0,6) = 0,000772595 b = Y_min - a * X_min # 1 − (0,000772595 × 75718,8) = −38 y = a * current + b # 0,000772595 * 115000 - 38 return max(round(y), 1) # évite les valeurs négatives def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # if (self.pairs[pair]['first_amount'] > 0): # amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) # else: # if last_candle['enter_tag'] in ['fall', 'bear', 'Force', 'Range-']: # amount = self.wallets.get_available_stake_amount() / 5 # else: # amount = self.wallets.get_available_stake_amount() / 3# / (2 * self.pairs[pair]['count_of_lost'] + 1) return self.wallets.get_available_stake_amount() def calculateMises(self, pair, ath, val): # ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle)) self.pairs[pair]['last_ath'] = ath full = self.wallets.get_total_stake_amount() steps = self.calculateNumberOfSteps(val, ath, max_steps=self.max_steps.value) mises = self.progressive_parts(full, steps, full / (steps * 2)) print(f"ath={ath} full={full} steps={steps} mises={mises} ") self.pairs[pair]['mises'] = mises return full, mises, steps def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # self.printLog("skip open orders") return None dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) if (len(dataframe) < 1): # self.printLog("skip dataframe") return None last_candle = dataframe.iloc[-1].squeeze() # before_last_candle = dataframe.iloc[-2].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) # open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) # hours_since_first_price = (current_time - trade.open_date_utc).seconds / 3600.0 # days_since_first_price = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 count_of_buys = trade.nr_of_successful_entries # current_time_utc = current_time.astimezone(timezone.utc) # open_date = trade.open_date.astimezone(timezone.utc) # days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) # round(current_profit * trade.stake_amount, 1) # last_lost = self.getLastLost(last_candle, pair) pct_first = 0 # total_counts = sum( # pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC') # # if self.pairs[pair]['first_price']: # pct_first = self.getPctFirstBuy(pair, last_candle) # if profit > - self.pairs[pair]['first_amount'] \ # and self.wallets.get_available_stake_amount() < self.pairs[pair]['first_amount'] \ # and last_candle['sma24_deriv1_1h'] < 0: # stake_amount = trade.stake_amount # self.pairs[pair]['previous_profit'] = profit # trade_type = "Sell " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') # self.pairs[trade.pair]['count_of_buys'] += 1 # self.pairs[pair]['total_amount'] = stake_amount # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟥 Stoploss", # dispo=dispo, # pair=trade.pair, # rate=current_rate, # trade_type=trade_type, # profit=round(profit, 1), # buys=trade.nr_of_successful_entries + 1, # stake=round(stake_amount, 2) # ) # # self.pairs[trade.pair]['last_price'] = current_rate # self.pairs[trade.pair]['max_touch'] = last_candle['close'] # self.pairs[trade.pair]['last_candle'] = last_candle # # return -stake_amount if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 lim = 0.3 if (len(dataframe) < 1): # self.printLog("skip dataframe") return None # dca_thresholds = split_ratio_one_third((last_candle['mid'] - (ath * self.allow_decrease_rate.value)) / last_candle['mid'], steps) #((last_candle['mid'] - (ath * self.allow_decrease_rate.value)) / steps) / last_candle['mid'] # 0.0025 + 0.0005 * count_of_buys if len(self.pairs[pair]['dca_thresholds']) == 0: self.calculateStepsDcaThresholds(last_candle, pair) dca_threshold = self.pairs[pair]['dca_thresholds'][min(count_of_buys - 1, len(self.pairs[pair]['dca_thresholds']) - 1)] # Dernier prix d'achat réel (pas le prix moyen) last_fill_price = self.pairs[trade.pair]['last_price'] decline = (last_fill_price - current_rate) / last_fill_price increase = - decline # FIN ########################## ALGO ATH force = False #self.hours_force.value and last_candle[self.indic_1h_force_buy.value] > 0 condition = minutes > 5 and last_candle['percent'] > 0 \ and ((count_of_buys <= 4 and last_candle['sma24_deriv1'] > 0) or (count_of_buys > 4 and last_candle['sma60_deriv1'] > 0))\ and last_candle['close'] < self.pairs[pair]['first_price'] if ((force or decline >= dca_threshold) and condition): try: print(f"decline={decline} last_fill_price={last_fill_price} current_rate={current_rate}") if self.pairs[pair]['has_gain'] and profit > 0: self.pairs[pair]['force_sell'] = True self.pairs[pair]['previous_profit'] = profit return None stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle)) # if force: # stake_amount = stake_amount / 2 # self.printLog(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}") if stake_amount > 0: self.pairs[pair]['previous_profit'] = profit trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 " + ("Force" if force else 'Loss -'), dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_price'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle self.pairs[trade.pair]['min_buy_price'] = min(current_rate, self.pairs[trade.pair]['min_buy_price']) # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_price", "last_max", "max_touch", "last_sell","last_price", 'count_of_buys', 'current_profit'] # # self.printLog(df_filtered) return stake_amount return None except Exception as exception: self.printLog(exception) return None increase_dca_threshold = 0.003 if current_profit > increase_dca_threshold \ and (increase >= increase_dca_threshold and self.wallets.get_available_stake_amount() > 0) \ and last_candle['sma5_deriv1'] > 0 and last_candle['sma5_deriv2'] > 0 and last_candle['max_rsi_12'] < 80: try: print(f"decline={decline} last_fill_price={last_fill_price} current_rate={current_rate}") self.pairs[pair]['previous_profit'] = profit stake_amount = max(10, min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))) if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type='Gain ' + str(round(increase, 4)), profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_price'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle self.pairs[trade.pair]['min_buy_price'] = min(current_rate, self.pairs[trade.pair]['min_buy_price']) return stake_amount return None except Exception as exception: self.printLog(exception) return None return None def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() # last_candle_1h = dataframe.iloc[-13].squeeze() # before_last_candle = dataframe.iloc[-2].squeeze() # before_last_candle_2 = dataframe.iloc[-3].squeeze() # before_last_candle_12 = dataframe.iloc[-13].squeeze() # # expected_profit = self.expectedProfit(pair, last_candle) # # self.printLog(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") # # # ----- 1) Charger les variables de trailing pour ce trade ----- # max_price = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = trade.calc_profit(current_rate) # round(current_profit * trade.stake_amount, 1) if current_profit > 0: self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) # else: # self.pairs[pair]['max_profit'] = 0 max_profit = self.pairs[pair]['max_profit'] # if current_profit > 0: # self.printLog(f"profit={profit} max_profit={max_profit} current_profit={current_profit}") # baisse = 0 # if profit > 0: # baisse = 1 - (profit / max_profit) # mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) # hours_since_first_price = (current_time - trade.open_date_utc).seconds / 3600.0 # days_since_first_price = (current_time - trade.open_date_utc).days # hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 # ----- 2) Mise à jour du max_price ----- self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) # ----- 3) Calcul du profit max atteint ----- # profit_max = (max_price - trade.open_rate) / trade.open_rate current_trailing_stop_positive = self.trailing_stop_positive current_trailing_only_offset_is_reached = self.trailing_only_offset_is_reached current_trailing_stop_positive_offset = self.trailing_stop_positive_offset current_trailing_stop_positive_offset = self.dynamic_trailing_offset( pair, self.pairs[pair]['total_amount'], last_candle, price=current_rate, ath=self.pairs[pair]['last_ath'], count_of_buys=count_of_buys) # max_ = last_candle['max180'] # min_ = last_candle['min180'] # mid = last_candle['mid'] # éviter division par zéro # position = (mid - min_) / (max_ - min_) # zone = int(position * 3) # 0 à 2 # if zone == 0: # current_trailing_stop_positive = self.trailing_stop_positive # current_trailing_stop_positive_offset = self.trailing_stop_positive_offset * 2 # if minutes > 1440: # current_trailing_only_offset_is_reached = False # current_trailing_stop_positive_offset = self.trailing_stop_positive_offset # if zone == 1: # ----- 5) Calcul du trailing stop dynamique ----- # Exemple : offset=0.321 => stop à +24.8% trailing_stop = max_profit * (1.0 - current_trailing_stop_positive) baisse = 0 if max_profit: baisse = (max_profit - profit) / max_profit # print(f"baisse={baisse}") # if minutes % 1 == 0: # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else " # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type=f"{round(profit, 2)} {round(max_profit, 2)} {round(trailing_stop,2)} {minutes}", # profit=round(profit, 2), # buys=count_of_buys, # stake=0 # ) if trade.is_short: if current_profit > 0.005 and \ (baisse > 0.25 and last_candle[f"close"] <= last_candle['sma24']) \ and last_candle['hapercent'] > 0 : self.pairs[pair]['force_sell'] = True return 'B30sht' else: # if current_profit < - 0.02 and last_candle[f"close"] <= last_candle['sma60']: # self.pairs[pair]['force_sell'] = True # return 'sma60' if current_profit > 0.005 and \ (baisse > 0.25 and last_candle[f"close"] <= last_candle['sma24']) \ and last_candle['hapercent'] <0 : self.pairs[pair]['force_sell'] = True return 'B30Lng' # if profit > 0 and last_candle['cross_sma60']: #5 or last_candle['rsi_1d'] < 30: # return 'Cross' # # if last_candle['max_rsi_24'] > 88 and last_candle['hapercent'] < 0\ # and last_candle['sma5_deriv2'] < -0.1: # return f"rsi_{count_of_buys}_{self.pairs[pair]['has_gain']}" limit = max_profit * (1 - current_trailing_stop_positive) # if profit < limit and baisse > 0.2: # return f"lim_{count_of_buys}_{self.pairs[pair]['has_gain']}" # if last_candle['ml_prob'] > 0.5: # if last_candle['sma12_deriv1'] > 0: # and last_candle['rsi'] < 85: # return None # if last_candle['sma24_deriv1'] > 0 : #and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15: # if (minutes < 180): # return None # if (minutes > 1440 and last_candle['sma60_deriv1'] > 0) : # return None # # ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? ----- # if current_trailing_only_offset_is_reached and max_profit > current_trailing_stop_positive_offset: # # Max profit pas atteint ET perte < 2 * current_trailing_stop_positive # if profit > limit: # 2 * current_trailing_stop_positive: # print( # f"{current_time} trailing non atteint trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} " # f"max={round(max_profit, 4)} offset={round(current_trailing_stop_positive_offset, 4)} baisse={round(baisse,2)}") # return None # ne pas activer le trailing encore # else: # print( # f"{current_time} trailing atteint trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} " # f"max={round(max_profit, 4)} offset={round(current_trailing_stop_positive_offset, 4)} baisse={round(baisse,2)}") # else: # # print( # # f"1 - {current_time} trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} max={round(max_profit, 4)} " # # f"limit={round(limit, 4)} offset={round(current_trailing_stop_positive_offset, 4)}" # # f" baisse={round(baisse,2)} {round(last_candle['sma180_deriv1'], 4)} {round(last_candle['sma60_deriv1'], 4)} {round(last_candle['sma24_deriv1'], 4)}") # # return None # # Sinon : trailing actif dès le début # # # ----- 6) Condition de vente ----- # if 0 < profit <= trailing_stop: # and last_candle['mid'] < last_candle['sma5']: # and profit > current_trailing_stop_positive_offset: # self.pairs[pair]['force_buy'] = True # print( # f"{current_time} Condition de vente trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} max={round(max_profit, 4)} " # f"{round(limit, 4)} offset={round(current_trailing_stop_positive_offset, 4)} " # f"baisse={round(baisse,2)}") # # return f"stop_{count_of_buys}_{self.pairs[pair]['has_gain']}" # print( # f"2 - {current_time} trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} max={round(max_profit, 4)} " # f"{round(limit, 4)} offset={round(current_trailing_stop_positive_offset, 4)} " # f"baisse={round(baisse,2)} {round(last_candle['sma180_deriv1'], 4)} {round(last_candle['sma60_deriv1'], 4)} {round(last_candle['sma24_deriv1'], 4)}") return None def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1h') for pair in pairs] informative_pairs += [(pair, '1d') for pair in pairs] return informative_pairs def populate1hIndicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # --- WEEKLY LEVELS --- # semaine précédente = semaine ISO différente df["week"] = df.index.isocalendar().week df["year"] = df.index.year df["weekly_low"] = ( df.groupby(["year", "week"])["low"] .transform("min") .shift(1) # décalé -> pas regarder la semaine en cours ) df["weekly_high"] = ( df.groupby(["year", "week"])["high"] .transform("max") .shift(1) ) # Définition simple d'une zone de demande hebdo : # bas + 25% de la bougie => modifiable df["weekly_demand_zone_low"] = df["weekly_low"] df["weekly_demand_zone_high"] = df["weekly_low"] * 1.025 # --- MONTHLY LEVELS --- df["month"] = df.index.month df["monthly_low"] = ( df.groupby(["year", "month"])["low"] .transform("min") .shift(1) # mois précédent uniquement ) df["monthly_high"] = ( df.groupby(["year", "month"])["high"] .transform("max") .shift(1) ) df["monthly_demand_zone_low"] = df["monthly_low"] df["monthly_demand_zone_high"] = df["monthly_low"] * 1.03 return df # ----- SIGNALS SIMPLES POUR EXEMPLE ----- # def populate_buy_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # df["buy"] = 0 # # # Exemple : acheter si le prix tape la zone de demande hebdomadaire # df.loc[ # (df["close"] <= df["weekly_demand_zone_high"]) & # (df["close"] >= df["weekly_demand_zone_low"]), # "buy" # ] = 1 # # return df # # def populate_sell_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # df["sell"] = 0 # # # Exemple : vendre sur retour au weekly_high précédent # df.loc[df["close"] >= df["weekly_high"], "sell"] = 1 # # return df def rsi_trend_probability(self, dataframe, short=6, long=12): dataframe = dataframe.copy() dataframe['rsi_short'] = talib.RSI(dataframe['mid'], short) dataframe['rsi_long'] = talib.RSI(dataframe['mid'], long) dataframe['cross_soft'] = np.tanh((dataframe['rsi_short'] - dataframe['rsi_long']) / 7) dataframe['gap'] = (dataframe['rsi_short'] - dataframe['rsi_long']) / 100 dataframe['trend'] = (dataframe['rsi_long'] - 50) / 50 dataframe['rtp'] = ( 0.6 * dataframe['cross_soft'] + 0.25 * dataframe['gap'] + 0.15 * dataframe['trend'] ).clip(-1, 1) return dataframe def to_utc_ts(self, x): return pd.to_datetime(x, utc=True) # suppose self.btc_ath_history exists (liste de dict) def get_last_ath_before_candle(self, last_candle): # return last_candle['max30_1d'] candle_date = self.to_utc_ts(last_candle['date']) # ou to_utc_ts(last_candle.name) best = None for a in self.btc_ath_history: # getattr(self, "btc_ath_history", []): ath_date = self.to_utc_ts(a["date"]) if ath_date <= candle_date: if best is None or ath_date > best[0]: best = (ath_date, a["price_usd"]) return best[1] if best is not None else None def trainModel(self, dataframe: DataFrame, metadata: dict): pair = self.getShortName(metadata['pair']) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) path = self.path # f"user_data/plots/{pair}/" os.makedirs(path, exist_ok=True) # # Étape 1 : sélectionner numériques # numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # # # Étape 2 : enlever constantes # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d") # and not c.endswith("_class") and not c.endswith("_price") # and not c.startswith('stop_buying'))] # # # Étape 3 : remplacer inf et NaN par 0 # dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) # # print("Colonnes utilisables pour le modèle :") # print(usable_cols) # # self.model_indicators = usable_cols # df = dataframe[self.model_indicators].copy() # Corrélations des colonnes corr = df.corr(numeric_only=True) # print("Corrélation des colonnes") # print(corr) # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies os.makedirs(path, exist_ok=True) horizon = 120 # en 1min indicator = 'sma60' df['future_max'] = df[indicator].shift(-1).rolling(horizon).max() df['future_min'] = df[indicator].shift(-1).rolling(horizon).min() tp = 0.0025 # +% sl = 0.0025 # -% (important !) df['target'] = 0 # 🎯 cas gagnant df.loc[df['future_max'] > df[indicator] * (1 + tp), 'target'] = 1 # 💀 cas perdant df.loc[df['future_min'] < df[indicator] * (1 - sl), 'target'] = -1 # Filtre # df = df[df['atr_norm'] > 0.002] print("===== 🚀 TRAIN MODEL START =====") df = df.dropna().copy() features = self.listUsableColumns(df) target_col = "target" # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies df['target'] = 0 for i in range(len(df) - horizon): window = df.iloc[i + 1:i + 1 + horizon] entry = df.iloc[i][indicator] tp_price = entry * (1 + tp) sl_price = entry * (1 - sl) hit_tp = window[window[indicator] >= tp_price] hit_sl = window[window[indicator] <= sl_price] if not hit_tp.empty and not hit_sl.empty: if hit_tp.index[0] < hit_sl.index[0]: df.iloc[i, df.columns.get_loc('target')] = 1 else: df.iloc[i, df.columns.get_loc('target')] = -1 elif not hit_tp.empty: df.iloc[i, df.columns.get_loc('target')] = 1 elif not hit_sl.empty: df.iloc[i, df.columns.get_loc('target')] = -1 working_columns = self.select_features_pipeline(df) features=working_columns X = df[features] y = (df['target'] == 1).astype(int) # df[target_col] # df['target'].value_counts(normalize=True) counts = df['target'].value_counts() n_neg = counts.get(0, 0) # nombre de 0 n_pos = counts.get(1, 0) # nombre de 1 scale_pos_weight = n_neg / n_pos print("Samples:", len(df)) print("Target ratio:", df['target'].mean()) print("Working features:", len(working_columns)) print("Used features:", len(X.columns)) print("Poids pour la classe 1 :", scale_pos_weight) print("==== VARIANCE ====") print(X.var().sort_values().head(10)) print("==== DESCRIBE ====") print(X.describe().T[['mean', 'std']].head(20)) print("Samples before:", len(df)) df = df.dropna() print("Samples after:", len(df)) print(df['target'].value_counts()) # time.sleep(5.5) # Pause 5.5 seconds # Corrélations triées par importance avec une colonne cible target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False) print("Corrélations triées par importance avec une colonne cible") print(target_corr) # Corrélations triées par importance avec une colonne cible corr = df.corr(numeric_only=True) corr_unstacked = ( corr.unstack() .reset_index() .rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"}) ) # Supprimer les doublons col1/col2 inversés et soi-même corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]] # Trier par valeur absolue de corrélation corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index) print("Trier par valeur absolue de corrélation") print(corr_sorted.head(20)) # --- Calcul de la corrélation --- corr = df.corr(numeric_only=True) # évite les colonnes non numériques corr = corr * 100 # passage en pourcentage # --- Masque pour n’afficher que le triangle supérieur (optionnel) --- mask = np.triu(np.ones_like(corr, dtype=bool)) # --- Création de la figure --- fig, ax = plt.subplots(figsize=(96, 36)) # --- Heatmap avec un effet “température” --- sns.heatmap( corr, mask=mask, cmap="coolwarm", # palette bleu → rouge center=0, # 0 au centre annot=True, # affiche les valeurs dans chaque case fmt=".0f", # format entier (pas de décimale) cbar_kws={"label": "Corrélation (%)"}, # légende à droite linewidths=0.5, # petites lignes entre les cases ax=ax ) # --- Personnalisation --- ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20) plt.xticks(rotation=45, ha="right") plt.yticks(rotation=0) # --- Sauvegarde --- output_path = f"{self.path}/Matrice_de_correlation_temperature.png" plt.savefig(output_path, bbox_inches="tight", dpi=150) plt.close(fig) print(f"✅ Matrice enregistrée : {output_path}") # Exemple d'utilisation : # selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.98) # print("===== 🎯 FEATURES SÉLECTIONNÉES =====") # print(selected_corr) # # # 🔥 EXTRACTION CORRECTE # working_columns = selected_corr["feature"].tolist() # Nettoyage df = df[working_columns + ['target', indicator]].dropna() X = df[working_columns] y = df['target'] self.model_indicators = working_columns # Nettoyage df = df.dropna() X = df[self.model_indicators] y = df['target'] # ta colonne cible binaire ou numérique print("===== 🎯 FEATURES SCORES =====") print(self.feature_auc_scores(X, y)) # 4️⃣ Split train/test X = df[self.model_indicators] y = df['target'] # Séparation temporelle (train = 80 %, valid = 20 %) # X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False) split_idx = int(len(df) * 0.8) df_train = df.iloc[:split_idx].copy() df_valid = df.iloc[split_idx:].copy() X_train = df_train[self.model_indicators] y_train = df_train['target'] X_valid = df_valid[self.model_indicators] y_valid = df_valid['target'] self.df_valid = df_valid # Nettoyage des valeurs invalides selector = VarianceThreshold(threshold=0.0001) selector.fit(X_train) selected = X_train.columns[selector.get_support()] print("Colonnes conservées :", list(selected)) # 5️⃣ Entraînement du modèle # self.train_model = RandomForestClassifier(n_estimators=200, random_state=42) # def objective(trial): # self.train_model = XGBClassifier( # n_estimators=trial.suggest_int("n_estimators", 200, 300), # max_depth=trial.suggest_int("max_depth", 3, 6), # learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3), # subsample=trial.suggest_float("subsample", 0.7, 1.0), # colsample_bytree=trial.suggest_float("colsample_bytree", 0.7, 1.0), # scale_pos_weight=1, # tu mettras balance_ratio ici si tu veux # objective="binary:logistic", # eval_metric="logloss", # n_jobs=-1 # ) # # self.train_model.fit(X_train, y_train) # # y_pred = self.train_model.predict(X_valid) # <-- validation = test split # return f1_score(y_valid, y_pred) # # study = optuna.create_study(direction="maximize") # study.optimize(objective, n_trials=50) # def objective(trial): # # local_model = XGBClassifier( # # n_estimators=300, # nombre d'arbres plus raisonnable # # learning_rate=0.01, # un peu plus rapide que 0.006, mais stable # # max_depth=4, # capture plus de patterns que 3, sans overfitting excessif # # subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting # # colsample_bytree=0.8, # 80% des features par arbre # # gamma=0.01, # gain minimal pour un split → régularisation # # reg_alpha=0.01, # L1 régularisation des feuilles # # reg_lambda=1, # L2 régularisation des feuilles # # n_jobs=-1, # utilise tous les cœurs CPU pour accélérer # # random_state=42, # reproductibilité # # missing=float('nan'), # valeur manquante reconnue # # eval_metric='logloss' # métrique pour classification binaire # # ) # # local_model = XGBClassifier( # n_estimators=trial.suggest_int("n_estimators", 300, 500), # max_depth=trial.suggest_int("max_depth", 1, 6), # learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True), # subsample=trial.suggest_float("subsample", 0.6, 1.0), # colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0), # scale_pos_weight=1, # objective="binary:logistic", # eval_metric="logloss", # n_jobs=-1 # ) # # local_model.fit( # X_train, # y_train, # eval_set=[(X_valid, y_valid)], # # early_stopping_rounds=50, # verbose=False # ) # # proba = local_model.predict_proba(X_valid)[:, 1] # thresholds = np.linspace(0.1, 0.9, 50) # best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds) # # return best_f1 # def objective(trial): # # scale_pos_weight = (y_train == 0).sum() / max((y_train == 1).sum(), 1) # # local_model = XGBClassifier( # n_estimators=trial.suggest_int("n_estimators", 300, 500), # max_depth=trial.suggest_int("max_depth", 2, 6), # learning_rate=trial.suggest_float("learning_rate", 0.005, 0.2, log=True), # subsample=trial.suggest_float("subsample", 0.6, 1.0), # colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0), # gamma=trial.suggest_float("gamma", 0, 0.1), # reg_alpha=trial.suggest_float("reg_alpha", 0, 0.1), # reg_lambda=trial.suggest_float("reg_lambda", 0.5, 2), # scale_pos_weight=scale_pos_weight, # objective="binary:logistic", # eval_metric="logloss", # n_jobs=-1, # random_state=42 # ) # # local_model.fit( # X_train, # y_train, # eval_set=[(X_valid, y_valid)], # verbose=False # ) # # proba = local_model.predict_proba(X_valid)[:, 1] # # # 🔥 seuil optimisé # threshold = trial.suggest_float("threshold", 0.3, 0.7) # prices = self.df_valid["close"].values # profit = 0 # wins = 0 # losses = 0 # # horizon = trial.suggest_int("horizon", 2, 6) # # min_move = trial.suggest_float("min_move", 0.002, 0.01) # # for i in range(len(proba) - horizon): # if proba[i] > threshold: # entry = prices[i] # exit = prices[i + horizon] # pct = (exit - entry) / entry # # # 🔥 filtre anti bruit # if abs(pct) < min_move: # continue # # pct -= 0.001 # fees # profit += pct # if pct > 0: # wins += 1 # else: # losses += 1 # # if wins + losses == 0: # return -1 # # winrate = wins / (wins + losses) # # # 🔥 score final # return profit * winrate # 4️⃣ Fonction objectif Optuna # def objective(trial): # model = XGBClassifier( # n_estimators=trial.suggest_int("n_estimators", 300, 500), # max_depth=trial.suggest_int("max_depth", 3, 7), # learning_rate=trial.suggest_float("learning_rate", 0.005, 0.1, log=True), # subsample=trial.suggest_float("subsample", 0.6, 1.0), # colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0), # gamma=trial.suggest_float("gamma", 0, 0.1), # reg_alpha=trial.suggest_float("reg_alpha", 0, 0.1), # reg_lambda=trial.suggest_float("reg_lambda", 1, 2), # scale_pos_weight=scale_pos_weight, # objective="binary:logistic", # eval_metric="logloss", # n_jobs=-1, # random_state=42 # ) # # model.fit( # X_train, # y_train, # eval_set=[(X_valid, y_valid)], # verbose=False # ) # # best_threshold = 0 # proba = model.predict_proba(X_valid)[:, 1] # best_score = -1 # for t in np.linspace(0.2, 0.8, 30): # preds = (proba > t).astype(int) # precision = precision_score(y_valid, preds, zero_division=0) # if precision < 0.6: # score = 0 # else: # recall = recall_score(y_valid, preds, zero_division=0) # score = (0.7 * recall) + (0.3 * precision) # # if score > best_score: # best_threshold = t # best_score = score # print("Best threshold:", best_threshold) # # return best_score # # # proba = model.predict_proba(X_valid)[:, 1] # # # # thresholds = np.linspace(0.1, 0.9, 50) # # best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds) # # # # return best_f1 def objective(trial): model = LGBMClassifier( n_estimators=trial.suggest_int("n_estimators", 300, 700), learning_rate=trial.suggest_float("learning_rate", 0.02, 0.08), max_depth=trial.suggest_int("max_depth", 3, 6), num_leaves=trial.suggest_int("num_leaves", 20, 80), # 🔥 FIX CRITIQUE min_child_samples=trial.suggest_int("min_child_samples", 10, 50), subsample=trial.suggest_float("subsample", 0.7, 1.0), colsample_bytree=trial.suggest_float("colsample_bytree", 0.7, 1.0), # 🔥 FIX CRITIQUE reg_alpha=trial.suggest_float("reg_alpha", 0.0, 0.1), reg_lambda=trial.suggest_float("reg_lambda", 0.5, 1.5), scale_pos_weight=scale_pos_weight, random_state=42, n_jobs=-1 ) model.fit(X_train, y_train) proba = model.predict_proba(X_valid)[:, 1] best_score = 0 for t in np.linspace(0.2, 0.8, 30): preds = (proba > t).astype(int) precision = precision_score(y_valid, preds) recall = recall_score(y_valid, preds) # 🎯 ton objectif réel if precision < 0.6: score = 0 else: score = (0.7 * recall) + (0.3 * precision) if score > best_score: best_score = score return best_score # 3️⃣ Lancer l'optimisation study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=200) # 4️⃣ Afficher les meilleurs hyperparamètres print("✅ Best trial:") trial = study.best_trial print(trial.params) # 5️⃣ Entraîner le modèle final avec les meilleurs params best_model = XGBClassifier( **trial.params, scale_pos_weight=scale_pos_weight, objective="binary:logistic", eval_metric="logloss", n_jobs=-1, random_state=42 ) best_model.fit(X_train, y_train) self.train_model = best_model # 6️⃣ Calcul du meilleur seuil F1 proba = best_model.predict_proba(X_valid)[:, 1] thresholds = np.linspace(0.1, 0.9, 50) f1_scores = [f1_score(y_valid, proba > t) for t in thresholds] best_threshold = thresholds[np.argmax(f1_scores)] print("✅ Meilleur seuil F1:", best_threshold) # SHAP # Reconstruction du modèle final avec les meilleurs hyperparamètres # Récupération des meilleurs paramètres trouvés best_params = study.best_params # === SHAP plots === # Calcul SHAP explainer = shap.TreeExplainer(self.train_model) shap_values = explainer(X_train) # On choisit une observation pour le graphique waterfall # Explication du modèle de prédiction pour la première ligne de X_valid.” i = 0 # Extraction des valeurs shap_val = shap_values[i].values feature_names = X_train.columns feature_values = X_train.iloc[i] # Tri par importance absolue # order = np.argsort(np.abs(shap_val))[::-1] k = 10 order = np.argsort(np.abs(shap_val))[::-1][:k] # ---- Création figure sans l'afficher ---- plt.ioff() # Désactive l'affichage interactif shap.plots.waterfall( shap.Explanation( values=shap_val[order], base_values=shap_values.base_values[i], data=feature_values.values[order], feature_names=feature_names[order] ), show=False # IMPORTANT : n'affiche pas dans Jupyter / console ) # Sauvegarde du graphique sur disque output_path = f"{self.path}/shap_waterfall.png" plt.savefig(output_path, dpi=200, bbox_inches='tight') plt.close() # ferme la figure proprement print(f"Graphique SHAP enregistré : {output_path}") # FIN SHAP # ---- après avoir exécuté la study ------ print("Best value (F1):", study.best_value) print("Best params:", study.best_params) best_trial = study.best_trial print("\n=== BEST TRIAL ===") print("Number:", best_trial.number) print("Value:", best_trial.value) print("Params:") for k, v in best_trial.params.items(): print(f" - {k}: {v}") # # All trials summary # print("\n=== ALL TRIALS ===") # for t in study.trials: # print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}") # DataFrame of trials df = study.trials_dataframe() print(df.head()) # Graphs fig = plot_optimization_history(study) fig.write_html(f"{self.path}/optimization_history.html") fig = plot_param_importances(study) fig.write_html(f"{self.path}/param_importances.html") fig = plot_slice(study) fig.write_html(f"{self.path}/slice.html") fig = plot_parallel_coordinate(study) fig.write_html(f"{self.path}/parallel_coordinates.html") # 2️⃣ Sélection des features AVANT calibration sfm = SelectFromModel(self.train_model, threshold="median", prefit=True) selected_features = X_train.columns[sfm.get_support()] print(selected_features) # 3️⃣ Calibration ensuite (facultative) calibrated = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5) calibrated.fit(X_train[selected_features], y_train) print(calibrated) # # # calibration # self.train_model = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5) # # Sélection # sfm = SelectFromModel(self.train_model, threshold="median") # sfm.fit(X_train, y_train) # selected_features = X_train.columns[sfm.get_support()] # print(selected_features) # self.train_model.fit(X_train, y_train) y_pred = self.train_model.predict(X_valid) y_proba = self.train_model.predict_proba(X_valid)[:, 1] # print(classification_report(y_valid, y_pred)) # print(confusion_matrix(y_valid, y_pred)) print("\nRapport de classification :\n", classification_report(y_valid, y_pred)) print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred)) # # Importances # importances = pd.DataFrame({ # "feature": self.train_model.feature_name_, # "importance": self.train_model.feature_importances_ # }).sort_values("importance", ascending=False) # print("\n===== 🔍 IMPORTANCE DES FEATURES =====") # print(importances) # Feature importance importances = self.train_model.feature_importances_ feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False) # Affichage feat_imp.plot(kind='bar', figsize=(18, 6)) plt.title("Feature importances") # plt.show() plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight') result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10, random_state=42) perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False) perm_imp.plot(kind='bar', figsize=(18, 6)) plt.title("Permutation feature importance") # plt.show() plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight') # Shap explainer = shap.TreeExplainer(self.train_model) shap_values = explainer.shap_values(X_valid) # Résumé global shap.summary_plot(shap_values, X_valid) # Force plot pour une observation force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :]) shap.save_html(f"{self.path}/shap_force_plot.html", force_plot) print("\nGénération des dépendances :\n") fig, ax = plt.subplots(figsize=(24, 48)) PartialDependenceDisplay.from_estimator( self.train_model, X_valid, selected_features, kind="average", ax=ax ) fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight") plt.close(fig) best_f1 = 0 best_t = 0.5 for t in [0.3, 0.4, 0.5, 0.6, 0.7]: y_pred_thresh = (y_proba > t).astype(int) score = f1_score(y_valid, y_pred_thresh) print(f"Seuil {t:.1f} → F1: {score:.3f}") if score > best_f1: best_f1 = score best_t = t print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}") # 6️⃣ Évaluer la précision (facultatif) preds = self.train_model.predict(X_valid) acc = accuracy_score(y_valid, preds) print(f"Accuracy: {acc:.3f}") # 7️⃣ Sauvegarde du modèle joblib.dump( {"model": self.train_model, "threshold": best_threshold, "features": self.model_indicators}, f"{self.path}/{pair}_rf_model.pkl" ) print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl") # X = dataframe des features (après shift/rolling/indicators) # y = target binaire ou décimale # model = ton modèle entraîné (RandomForestClassifier ou Regressor) # # --- 1️⃣ Mutual Information (MI) --- # mi_scores = mutual_info_classif(X.fillna(0), y) # mi_series = pd.Series(mi_scores, index=X.columns, name='MI') # # # --- 2️⃣ Permutation Importance (PI) --- # pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1) # pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI') # # # --- 3️⃣ Combinaison dans un seul dataframe --- # importance_df = pd.concat([mi_series, pi_series], axis=1) # importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle # print(importance_df) # # importance_df.plot(kind='bar', figsize=(10, 5)) # plt.title("Mutual Info vs Permutation Importance") # plt.ylabel("Score") # plt.show() self.analyze_model(pair, self.train_model, X_train, X_valid, y_train, y_valid) def trading_score(self, y_true, y_pred_proba, prices, threshold=0.5): trades = (y_pred_proba > threshold).astype(int) profit = 0 trade_count = 0 for i in range(len(trades) - 1): if trades[i] == 1: entry = prices[i] exit = prices[i + 1] pct = (exit - entry) / entry profit += pct trade_count += 1 if trade_count == 0: return -1 # pénalité si aucun trade return profit def inspect_model(self, model): """ Affiche les informations d'un modèle ML déjà entraîné. Compatible avec scikit-learn, xgboost, lightgbm, catboost... """ print("===== 🔍 INFORMATIONS DU MODÈLE =====") # Type de modèle print(f"Type : {type(model).__name__}") print(f"Module : {model.__class__.__module__}") # Hyperparamètres if hasattr(model, "get_params"): params = model.get_params() print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====") for k, v in params.items(): print(f"{k}: {v}") # Nombre d’estimateurs if hasattr(model, "n_estimators"): print(f"\nNombre d’estimateurs : {model.n_estimators}") # Importance des features if hasattr(model, "feature_importances_"): print("\n===== 📊 IMPORTANCE DES FEATURES =====") # Correction ici : feature_names = getattr(model, "feature_names_in_", None) if isinstance(feature_names, np.ndarray): feature_names = feature_names.tolist() elif feature_names is None: feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))] fi = pd.DataFrame({ "feature": feature_names, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(fi) # Coefficients (modèles linéaires) if hasattr(model, "coef_"): print("\n===== ➗ COEFFICIENTS =====") coef = np.array(model.coef_) if coef.ndim == 1: for i, c in enumerate(coef): print(f"Feature {i}: {c:.6f}") else: print(coef) # Intercept if hasattr(model, "intercept_"): print("\nIntercept :", model.intercept_) # Classes connues if hasattr(model, "classes_"): print("\n===== 🎯 CLASSES =====") print(model.classes_) # Scores internes for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]: if hasattr(model, attr): print(f"\n{attr} = {getattr(model, attr)}") # Méthodes disponibles print("\n===== 🧩 MÉTHODES DISPONIBLES =====") methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)] print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else "")) print("\n===== ✅ FIN DE L’INSPECTION =====") def analyze_model(self, pair, model, X_train, X_valid, y_train, y_valid): """ Analyse complète d'un modèle ML supervisé (classification binaire). Affiche performances, importance des features, matrices, seuils, etc. """ os.makedirs(self.path, exist_ok=True) # ---- Prédictions ---- preds = model.predict(X_valid) probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds # ---- Performances globales ---- print("===== 📊 ÉVALUATION DU MODÈLE =====") print("Colonnes du modèle :", model.feature_names_in_) print("Colonnes X_valid :", list(X_valid.columns)) print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}") print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}") print("TN (True Negative) / FP (False Positive)") print("FN (False Negative) / TP (True Positive)") print("\nRapport de classification :\n", classification_report(y_valid, preds)) # | Élément | Valeur | Signification | # | ------------------- | ------ | ----------------------------------------------------------- | # | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas d’achat) | # | FP (False Positive) | 43 | Modèle a prédit 1 alors que c’était 0 (faux signal d’achat) | # | FN (False Negative) | 108 | Modèle a prédit 0 alors que c’était 1 (manqué un achat) | # | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal d’achat) | # ---- Matrice de confusion ---- cm = confusion_matrix(y_valid, preds) print("Matrice de confusion :\n", cm) plt.figure(figsize=(4, 4)) plt.imshow(cm, cmap="Blues") plt.title("Matrice de confusion") plt.xlabel("Prédit") plt.ylabel("Réel") for i in range(2): for j in range(2): plt.text(j, i, cm[i, j], ha="center", va="center", color="black") # plt.show() plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight") plt.close() # ---- Importance des features ---- if hasattr(model, "feature_importances_"): print("\n===== 🔍 IMPORTANCE DES FEATURES =====") importance = pd.DataFrame({ "feature": X_train.columns, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(importance) # Crée une figure plus grande fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces # Trace le bar plot sur cet axe importance.plot.bar(x="feature", y="importance", legend=False, ax=ax) # Tourner les labels pour plus de lisibilité ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right') plt.title("Importance des features") # plt.show() plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight") plt.close() # ---- Arbre de décision (extrait) ---- if hasattr(model, "estimators_"): print("\n===== 🌳 EXTRAIT D’UN ARBRE =====") print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800]) # ---- Précision selon le seuil ---- thresholds = np.linspace(0.1, 0.9, 9) print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====") for t in thresholds: preds_t = (probs > t).astype(int) acc = accuracy_score(y_valid, preds_t) print(f"Seuil {t:.1f} → précision {acc:.3f}") # ---- ROC Curve ---- fpr, tpr, _ = roc_curve(y_valid, probs) plt.figure(figsize=(5, 4)) plt.plot(fpr, tpr, label="ROC curve") plt.plot([0, 1], [0, 1], linestyle="--", color="gray") plt.xlabel("Taux de faux positifs") plt.ylabel("Taux de vrais positifs") plt.title("Courbe ROC") plt.legend() # plt.show() plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight") plt.close() # # ---- Interprétation SHAP (optionnelle) ---- # try: # import shap # # print("\n===== 💡 ANALYSE SHAP =====") # explainer = shap.TreeExplainer(model) # shap_values = explainer.shap_values(X_valid) # # shap.summary_plot(shap_values[1], X_valid) # # Vérifie le type de sortie de shap_values # if isinstance(shap_values, list): # # Cas des modèles de classification (plusieurs classes) # shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1] # else: # shap_values_to_plot = shap_values # # # Ajustement des dimensions au besoin # if shap_values_to_plot.shape[1] != X_valid.shape[1]: # print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})") # min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1]) # shap_values_to_plot = shap_values_to_plot[:, :min_dim] # X_to_plot = X_valid.iloc[:, :min_dim] # else: # X_to_plot = X_valid # # plt.figure(figsize=(12, 4)) # shap.summary_plot(shap_values_to_plot, X_to_plot, show=False) # plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight") # plt.close() # except ImportError: # print("\n(SHAP non installé — `pip install shap` pour activer l’analyse SHAP.)") y_proba = model.predict_proba(X_valid)[:, 1] # Trace ou enregistre le graphique self.plot_threshold_analysis(y_valid, y_proba, step=0.05, save_path=f"{self.path}/threshold_analysis.png") # y_valid : vraies classes (0 / 1) # y_proba : probabilités de la classe 1 prédites par ton modèle # Exemple : y_proba = model.predict_proba(X_valid)[:, 1] seuils = np.arange(0.0, 1.01, 0.05) precisions, recalls, f1s = [], [], [] for seuil in seuils: y_pred = (y_proba >= seuil).astype(int) precisions.append(precision_score(y_valid, y_pred)) recalls.append(recall_score(y_valid, y_pred)) f1s.append(f1_score(y_valid, y_pred)) plt.figure(figsize=(10, 6)) plt.plot(seuils, precisions, label='Précision', marker='o') plt.plot(seuils, recalls, label='Rappel', marker='o') plt.plot(seuils, f1s, label='F1-score', marker='o') # Ajoute un point pour le meilleur F1 best_idx = np.argmax(f1s) plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f'Max F1 ({seuils[best_idx]:.2f})') plt.title("Performance du modèle selon le seuil de probabilité") plt.xlabel("Seuil de probabilité (classe 1)") plt.ylabel("Score") plt.grid(True, alpha=0.3) plt.legend() plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight') # plt.show() print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}") print("\n===== ✅ FIN DE L’ANALYSE =====") def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None): """ Affiche la précision, le rappel et le F1-score selon le seuil de décision. y_true : labels réels (0 ou 1) y_proba : probabilités prédites (P(hausse)) step : pas entre les seuils testés save_path : si renseigné, enregistre l'image au lieu d'afficher """ # Le graphique généré affichera trois courbes : # 🔵 Precision — la fiabilité de tes signaux haussiers. # 🟢 Recall — la proportion de hausses que ton modèle détecte. # 🟣 F1-score — le compromis optimal entre les deux. thresholds = np.arange(0, 1.01, step) precisions, recalls, f1s = [], [], [] for thr in thresholds: preds = (y_proba >= thr).astype(int) precisions.append(precision_score(y_true, preds)) recalls.append(recall_score(y_true, preds)) f1s.append(f1_score(y_true, preds)) plt.figure(figsize=(10, 6)) plt.plot(thresholds, precisions, label="Precision", linewidth=2) plt.plot(thresholds, recalls, label="Recall", linewidth=2) plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--") plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5") plt.title("📊 Performance selon le seuil de probabilité", fontsize=14) plt.xlabel("Seuil de décision (threshold)") plt.ylabel("Score") plt.legend() plt.grid(True, alpha=0.3) if save_path: plt.savefig(save_path, bbox_inches='tight') print(f"✅ Graphique enregistré : {save_path}") else: plt.show() def feature_auc_scores(self, X, y): aucs = {} for col in X.columns: try: aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0)) except Exception: aucs[col] = np.nan return pd.Series(aucs).sort_values(ascending=False) def listUsableColumns(self, dataframe): # Étape 1 : sélectionner numériques numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # Étape 2 : enlever constantes usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and not c.endswith("_state") # and not c.endswith("_1h") and not c.startswith("open") # and not c.startswith("close") # and not c.startswith("low") and not c.startswith("high") and not c.startswith("haopen") and not c.startswith("haclose") # and not c.startswith("bb_lower") and not c.startswith("bb_upper") # and not c.startswith("bb_middle") and not c.endswith("_count") and not c.endswith("_class") and not c.endswith("_price") and not c.startswith('stop_buying') and not c.startswith('target') and not c.startswith('lvl') # and not c.startswith('sma5_deriv1_1h') # and not c.startswith('sma5_1h') # and not c.startswith('sma12_deriv1_1h') # and not c.startswith('sma12_1h') # and not c.startswith('confidence_index') # and not c.startswith('price_change') # and not c.startswith('price_score') # and not c.startswith('heat_score') # and not c.startswith('min30_1d') # and not c.startswith('max30_1d') ] # Étape 3 : remplacer inf et NaN par 0 dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) # print("Colonnes utilisables pour le modèle :") # print(usable_cols) # self.model_indicators = usable_cols return usable_cols def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7): """ Sélectionne les features les plus corrélées avec target, tout en supprimant celles trop corrélées entre elles. """ # 1️⃣ Calcul des corrélations absolues avec la cible corr = df.corr(numeric_only=True) corr_target = corr[target].abs().sort_values(ascending=False) # 2️⃣ Prend les N features les plus corrélées avec la cible (hors target) features = corr_target.drop(target).head(top_n).index.tolist() # 3️⃣ Évite les features trop corrélées entre elles selected = [] for feat in features: too_correlated = False for sel in selected: if abs(corr.loc[feat, sel]) > corr_threshold: too_correlated = True break if not too_correlated: selected.append(feat) # 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation selected_corr = pd.DataFrame({ "feature": selected, "corr_with_target": [corr.loc[f, target] for f in selected] }).sort_values(by="corr_with_target", key=np.abs, ascending=False) return selected_corr def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" # d1s_col = f"{name}{suffixe}_deriv1_smooth" # d2s_col = f"{name}{suffixe}_deriv2_smooth" tendency_col = f"{name}{suffixe}_state" d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" tendency_col = f"{name}{suffixe}_state" series = dataframe[f"{name}{suffixe}"] d1 = series.diff() d2 = d1.diff() pmin = int(ema_period / 3) cond_bas = (d1.rolling(pmin).mean() > d1.rolling(ema_period).mean()) cond_haut = (d1.rolling(pmin).mean() < d1.rolling(ema_period).mean()) dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3) dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1)) factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[ f"{name}{suffixe}"].shift(1)) \ & (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"]) # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[ f"{name}{suffixe}"] # # dérivée relative simple # dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) # # lissage EMA # dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean() # # # dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median() # # dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1) # dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean() # epsilon adaptatif via rolling percentile p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95) p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95) eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef # fallback global eps global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1) eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2) # if verbose and self.dp.runmode.value in ('backtest'): # stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T # stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0) # print(f"---- Derivatives stats {timeframe}----") # print(stats) # print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}") # print("---------------------------") # mapping tendency def tag_by_derivatives(row): idx = int(row.name) d1v = float(row[d1_col]) d2v = float(row[d2_col]) eps1 = float(eps_d1_series.iloc[idx]) eps2 = float(eps_d2_series.iloc[idx]) # # mapping état → codes 3 lettres explicites # # | Ancien état | Nouveau code 3 lettres | Interprétation | # # | ----------- | ---------------------- | --------------------- | # # | 4 | HAU | Hausse Accélérée | # # | 3 | HSR | Hausse Ralentissement | # # | 2 | HST | Hausse Stable | # # | 1 | DHB | Départ Hausse | # # | 0 | PAL | Palier / neutre | # # | -1 | DBD | Départ Baisse | # # | -2 | BSR | Baisse Ralentissement | # # | -3 | BST | Baisse Stable | # # | -4 | BAS | Baisse Accélérée | # Palier strict if abs(d1v) <= eps1 and abs(d2v) <= eps2: return 0 # Départ si d1 ~ 0 mais d2 signale direction if abs(d1v) <= eps1: return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0 # Hausse if d1v > eps1: return 4 if d2v > eps2 else 3 # Baisse if d1v < -eps1: return -4 if d2v < -eps2 else -2 return 0 dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1) # if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'): # print("##################") # print(f"# STAT {timeframe} {name}{suffixe}") # print("##################") # self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2") return dataframe def calculateConfiance(self, informative): df = informative.copy() # ATR normalisé df['atr_norm'] = talib.ATR(df['high'], df['low'], df['close'], length=14) / df['close'] # SMA200 & pente df['sma200'] = talib.SMA(df['close'], 200) df['sma200_slope'] = df['sma200'].diff() # drawdown df['rolling_ath'] = df['close'].cummax() df['drawdown'] = (df['close'] - df['rolling_ath']) / df['rolling_ath'] # volume spike df['vol_spike'] = df['volume'] / df['volume'].rolling(20).mean() # RSI courts/longs df['rsi14'] = talib.RSI(df['close'], 14) df['rsi60'] = talib.RSI(df['close'], 60) # Scores normalisés df['vol_score'] = 1 - np.clip(df['atr_norm'] / 0.05, 0, 1) df['trend_score'] = 1 / (1 + np.exp(-df['sma200_slope'] * 150)) df['dd_score'] = 1 - np.clip(abs(df['drawdown']) / 0.3, 0, 1) df['volpanic_score'] = 1 - np.clip(df['vol_spike'] / 3, 0, 1) df['rsi_score'] = 1 / (1 + np.exp(-(df['rsi14'] - df['rsi60']) / 10)) # Indice final informative['confidence_index'] = ( 0.25 * df['vol_score'] + 0.25 * df['trend_score'] + 0.20 * df['dd_score'] + 0.15 * df['volpanic_score'] + 0.15 * df['rsi_score'] ) return informative def prune_features(self, model, dataframe, feature_columns, importance_threshold=0.01): """ Supprime les features dont l'importance est inférieure au seuil. Args: model: XGBClassifier déjà entraîné dataframe: DataFrame contenant toutes les features feature_columns: liste des colonnes/features utilisées pour la prédiction importance_threshold: seuil minimal pour conserver une feature (en proportion de l'importance totale) Returns: dataframe_pruned: dataframe avec uniquement les features conservées kept_features: liste des features conservées """ booster = model.get_booster() # Récupérer importance des features selon 'gain' importance = booster.get_score(importance_type='gain') # Normaliser pour que la somme soit 1 total_gain = sum(importance.values()) normalized_importance = {k: v / total_gain for k, v in importance.items()} # Features à garder kept_features = [f for f in feature_columns if normalized_importance.get(f, 0) >= importance_threshold] dataframe_pruned = dataframe[kept_features].fillna(0) # print(f"⚡ Features conservées ({len(kept_features)} / {len(feature_columns)}): {kept_features}") return dataframe_pruned, kept_features def trainModel2(self, df, metadata): pair = self.getShortName(metadata['pair']) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) path = self.path # f"user_data/plots/{pair}/" os.makedirs(path, exist_ok=True) horizon = 300 # 5h en 1min df['future_max'] = df['close'].shift(-1).rolling(horizon).max() df['future_min'] = df['close'].shift(-1).rolling(horizon).min() tp = 0.005 # +0.5% sl = 0.003 # -0.3% (important !) df['target'] = 0 # 🎯 cas gagnant df.loc[df['future_max'] > df['close'] * (1 + tp), 'target'] = 1 # 💀 cas perdant df.loc[df['future_min'] < df['close'] * (1 - sl), 'target'] = -1 # Filtre df = df[df['atr_norm'] > 0.002] print("===== 🚀 TRAIN MODEL START =====") df = df.dropna().copy() features = self.listUsableColumns(df) target_col = "target" # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies df['target'] = 0 # Exemple : 3 classes # Classe 0 : percent30 < -0.01 # Classe 1 : -0.01 <= percent30 <= 0.01 # Classe 2 : percent30 > 0.01 df['target'] = pd.cut( df['percent24'].shift(-12), bins=[-np.inf, -0.005, 0.005, np.inf], labels=[0, 1, 2] ) df = df.dropna(subset=['target']) # supprime les lignes avec target NaN df['target'] = df['target'].astype(int) # df = df.drop(columns=['percent24']) # features.remove('percent24') # features.remove('open') # features.remove('close') # features.remove('high') # features.remove('low') # for i in range(len(df) - horizon): # window = df.iloc[i + 1:i + 1 + horizon] # # entry = df.iloc[i]['close'] # tp_price = entry * (1 + tp) # sl_price = entry * (1 - sl) # # hit_tp = window[window['high'] >= tp_price] # hit_sl = window[window['low'] <= sl_price] # # if not hit_tp.empty and not hit_sl.empty: # if hit_tp.index[0] < hit_sl.index[0]: # df.iloc[i, df.columns.get_loc('target')] = 1 # else: # df.iloc[i, df.columns.get_loc('target')] = -1 # elif not hit_tp.empty: # df.iloc[i, df.columns.get_loc('target')] = 1 # elif not hit_sl.empty: # df.iloc[i, df.columns.get_loc('target')] = -1 features = self.select_features_pipeline(df) X = df[features] y = df['target'] #(df['target'] == 1).astype(int) # df[target_col] # df = df[features] print("DF shape:", df.shape) print("Columns:", features) # if "target" in features: # print("Target raw: ", df["target"].value_counts(dropna=False)) # else: # print("❌ target column missing") print("Target distribution:") print(y.value_counts(normalize=True)) # ⚠️ split temporel (CRUCIAL en trading) split = int(len(df) * 0.8) X_train, X_valid = X.iloc[:split], X.iloc[split:] y_train, y_valid = y.iloc[:split], y.iloc[split:] # ⚠️ SMOTE uniquement sur TRAIN smote = SMOTE(random_state=42) X_train_res, y_train_res = smote.fit_resample(X_train, y_train) print("After SMOTE:") print(pd.Series(y_train_res).value_counts(normalize=True)) num_classes = len(np.unique(y_train)) # nombre de classes dans ton target # ========================= # 🎯 OPTUNA OBJECTIVE # ========================= def objective(trial): params = { "objective": "multiclass", # <-- changer pour multiclass "metric": "multi_logloss", # <-- metric adaptée au multiclass "num_class": num_classes, # <-- nombre de classes "boosting_type": "gbdt", "num_leaves": trial.suggest_int("num_leaves", 16, 128), "max_depth": trial.suggest_int("max_depth", 3, 10), "learning_rate": trial.suggest_float("learning_rate", 0.005, 0.1, log=True), "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0), "bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0), "bagging_freq": trial.suggest_int("bagging_freq", 1, 10), "min_child_samples": trial.suggest_int("min_child_samples", 5, 100), "lambda_l1": trial.suggest_float("lambda_l1", 1e-4, 10, log=True), "lambda_l2": trial.suggest_float("lambda_l2", 1e-4, 10, log=True), "verbose": -1, "seed": 42, } train_data = lgb.Dataset(X_train_res, y_train_res) valid_data = lgb.Dataset(X_valid, y_valid) model = lgb.train( params, train_data, num_boost_round=1000, valid_sets=[valid_data], callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)] ) proba = model.predict(X_valid) preds = np.argmax(proba, axis=1) # <-- pour multiclass f1 = f1_score(y_valid, preds, average='macro') # <-- multiclass return f1 # ========================= # 🚀 RUN OPTUNA # ========================= study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=200) print("===== 🏆 BEST PARAMS =====") print(study.best_params) best_params = study.best_params.copy() # best_threshold = best_params.pop("threshold") # ========================= # 🔥 TRAIN FINAL MODEL # ========================= final_params = { **best_params, "objective": "multiclass", "metric": "multi_logloss", "num_class": num_classes, "boosting_type": "gbdt", "verbose": -1, "seed": 42 } # Entraînement train_data = lgb.Dataset(X_train_res, y_train_res) model = lgb.train(final_params, train_data, num_boost_round=1000) # ========================= # 📊 EVALUATION MULTICLASS # ========================= proba = model.predict(X_valid) # shape = (n_samples, n_classes) preds = np.argmax(proba, axis=1) # Classe prédite print("===== 📊 RESULTS =====") print("F1:", f1_score(y_valid, preds, average='macro')) print("Precision:", precision_score(y_valid, preds, average='macro')) print("Recall:", recall_score(y_valid, preds, average='macro')) # ROC AUC multiclass try: roc = roc_auc_score(y_valid, proba, multi_class='ovr', average='macro') print("ROC AUC:", roc) except ValueError: print("ROC AUC cannot be computed (check y_valid and number of classes)") # model_path = f"user_data/{metadata['pair'].replace('/', '_')}_lgbm.pkl" # joblib.dump({ # "model": model, # "threshold": best_threshold, # "features": features # }, model_path) self.train_model = model # self.model_threshold = best_threshold joblib.dump( {"model": self.train_model, # "threshold": best_threshold, "features": features}, f"{self.path}/{pair}_rf_model.pkl" ) print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl") # Génération de diagnostics pour multiclass proba = self.train_model.predict(X_valid) # shape = (n_samples, n_classes) preds = np.argmax(proba, axis=1) # labels prédits self.generate_diagnostics( model=self.train_model, X_valid=X_valid, y_valid=y_valid, df=df, metadata=metadata ) print(f"Detected multiclass SHAP with {num_classes} classes") self.generate_shap_analysis(model=self.train_model, X_valid=X_valid, metadata=metadata) def generate_diagnostics(self, model, X_valid, y_valid, df, metadata): os.makedirs(self.path, exist_ok=True) pair = metadata["pair"].replace("/", "_") # ts = datetime.now().strftime("%Y%m%d_%H%M%S") def save_fig(name): filepath = f"{self.path}/{pair}_{name}.png" plt.savefig(filepath) plt.close() print(f"📊 Saved: {filepath}") # ========================= # 🔥 PROBA & PREDICTIONS MULTICLASS # ========================= proba = model.predict(X_valid) # shape = (n_samples, n_classes) preds = np.argmax(proba, axis=1) # ========================= # 📊 PROBA DISTRIBUTION PAR CLASSE # ========================= plt.figure(figsize=(10, 5)) num_classes = proba.shape[1] for c in range(num_classes): plt.hist(proba[:, c][y_valid == c], bins=50, alpha=0.5, label=f"Class {c}") plt.title("Probability Distribution per Class") plt.legend() save_fig("proba_distribution") # ========================= # 📈 METRICS MULTICLASS # ========================= f1 = f1_score(y_valid, preds, average='macro') precision = precision_score(y_valid, preds, average='macro', zero_division=0) recall = recall_score(y_valid, preds, average='macro', zero_division=0) try: roc = roc_auc_score(y_valid, proba, multi_class='ovr', average='macro') except ValueError: roc = None print("===== 📊 RESULTS =====") print("F1:", f1) print("Precision:", precision) print("Recall:", recall) if roc is not None: print("ROC AUC:", roc) # ========================= # 💰 EQUITY CURVE SIMPLIFIÉE # ========================= prices = df.loc[X_valid.index]["close"].values returns = [] for i in range(len(preds) - 1): # Ex: utiliser uniquement classe cible 2 pour long if preds[i] == 2: r = (prices[i + 1] - prices[i]) / prices[i] returns.append(r) equity = np.cumsum(returns) plt.figure(figsize=(10, 5)) plt.plot(equity) plt.title("Equity Curve (Class 2 signals)") save_fig("equity_curve") # ========================= # 📊 FEATURE IMPORTANCE # ========================= importance = model.feature_importance() feat_names = X_valid.columns imp_df = pd.DataFrame({ "feature": feat_names, "importance": importance }).sort_values(by="importance", ascending=False) plt.figure(figsize=(10, 8)) plt.barh(imp_df["feature"][:20], imp_df["importance"][:20]) plt.gca().invert_yaxis() plt.title("Feature Importance") save_fig("feature_importance") # ========================= # 🔍 SHAP (sample pour perf) # ========================= try: sample_size = min(1000, len(X_valid)) X_sample = X_valid.sample(sample_size, random_state=42) explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_sample) # shap_values pour multiclass est liste de matrices if isinstance(shap_values, list): for c, sv in enumerate(shap_values): shap.summary_plot(sv, X_sample, show=False) save_fig(f"shap_summary_class{c}") else: shap.summary_plot(shap_values, X_sample, show=False) save_fig("shap_summary") except Exception as e: print(f"⚠️ SHAP failed: {e}") # ========================= # 📉 WIN / LOSS DISTRIBUTION # ========================= wins, losses = [], [] for i in range(len(preds) - 1): if preds[i] == 2: r = (prices[i + 1] - prices[i]) / prices[i] if r > 0: wins.append(r) else: losses.append(r) plt.figure(figsize=(10, 5)) plt.hist(wins, bins=50, alpha=0.5, label="Wins") plt.hist(losses, bins=50, alpha=0.5, label="Losses") plt.legend() plt.title("Wins / Losses Distribution (Class 2)") save_fig("wins_losses_distribution") # def generate_diagnostics(self, model, X_valid, y_valid, df, best_threshold, metadata): # # import os # import numpy as np # import pandas as pd # import matplotlib.pyplot as plt # from sklearn.metrics import precision_score, recall_score # import shap # from datetime import datetime # # os.makedirs(self.path, exist_ok=True) # # pair = metadata["pair"].replace("/", "_") # ts = datetime.now().strftime("%Y%m%d_%H%M%S") # # def save_fig(name): # filepath = f"{self.path}/{pair}_{name}.png" # plt.savefig(filepath) # plt.close() # print(f"📊 Saved: {filepath}") # # # ========================= # # 🔥 PROBA DISTRIBUTION # # ========================= # proba = model.predict(X_valid) # # plt.figure(figsize=(10, 5)) # plt.hist(proba[y_valid == 0], bins=50, alpha=0.5, label="Class 0") # plt.hist(proba[y_valid == 1], bins=50, alpha=0.5, label="Class 1") # plt.title("Probability Distribution") # plt.legend() # save_fig("proba_distribution") # # # ========================= # # 📈 PRECISION / RECALL # # ========================= # thresholds = np.linspace(0.1, 0.9, 50) # precisions, recalls = [], [] # # for t in thresholds: # preds = (proba > t).astype(int) # precisions.append(precision_score(y_valid, preds, zero_division=0)) # recalls.append(recall_score(y_valid, preds, zero_division=0)) # # plt.figure(figsize=(10, 5)) # plt.plot(thresholds, precisions, label="Precision") # plt.plot(thresholds, recalls, label="Recall") # plt.xlabel("Threshold") # plt.title("Precision / Recall vs Threshold") # plt.legend() # save_fig("precision_recall_curve") # # # ========================= # # 💰 EQUITY CURVE (simple) # # ========================= # prices = df.loc[X_valid.index]["close"].values # # returns = [] # for i in range(len(proba) - 1): # if proba[i] > best_threshold: # r = (prices[i+1] - prices[i]) / prices[i] # returns.append(r) # # equity = np.cumsum(returns) # # plt.figure(figsize=(10, 5)) # plt.plot(equity) # plt.title("Equity Curve") # save_fig("equity_curve") # # # ========================= # # 📊 FEATURE IMPORTANCE # # ========================= # importance = model.feature_importance() # feat_names = X_valid.columns # # imp_df = pd.DataFrame({ # "feature": feat_names, # "importance": importance # }).sort_values(by="importance", ascending=False) # # plt.figure(figsize=(10, 8)) # plt.barh(imp_df["feature"][:20], imp_df["importance"][:20]) # plt.gca().invert_yaxis() # plt.title("Feature Importance") # save_fig("feature_importance") # # # ========================= # # 🔍 SHAP (sample pour perf) # # ========================= # try: # sample_size = min(1000, len(X_valid)) # X_sample = X_valid.sample(sample_size, random_state=42) # # explainer = shap.TreeExplainer(model) # shap_values = explainer.shap_values(X_sample) # # shap.summary_plot(shap_values, X_sample, show=False) # save_fig("shap_summary") # # except Exception as e: # print(f"⚠️ SHAP failed: {e}") # # # ========================= # # 📉 WIN / LOSS DISTRIBUTION # # ========================= # wins, losses = [], [] # # for i in range(len(proba) - 1): # if proba[i] > best_threshold: # r = (prices[i+1] - prices[i]) / prices[i] # if r > 0: # wins.append(r) # else: # losses.append(r) # # plt.figure(figsize=(10, 5)) # plt.hist(wins, bins=50, alpha=0.5, label="Wins") # plt.hist(losses, bins=50, alpha=0.5, label="Losses") # plt.legend() # plt.title("Wins / Losses Distribution") # save_fig("wins_losses_distribution") def select_features_pipeline(self, df): df = df.dropna() y = df['target'] X = df[self.model_indicators] print("===== INITIAL FEATURES:", len(X.columns)) # 1. variance selected = self.remove_low_variance(X) X = X[selected] print("After variance:", len(X.columns)) # 2. corrélation selected = self.remove_correlated_features(X) X = X[selected] print("After correlation:", len(X.columns)) # 3. importance selected = self.select_by_importance(X, y, top_n=40) X = X[selected] print("After importance:", len(X.columns)) # 4. stabilité selected = self.stability_filter(X, y)[:25] X = X[selected] # # 5. Sharp filtering # explainer = shap.TreeExplainer(model) # shap_values = explainer.shap_values(X) # shap_importance = np.abs(shap_values).mean(axis=0) # selected = X.columns[np.argsort(shap_importance)[-20:]] # X = X[selected] # print("After sharp:", len(X.columns)) print("Final features:", len(X.columns)) return X.columns.tolist() def remove_correlated_features(self, df, threshold=0.95): corr = df.corr().abs() upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool)) to_drop = [column for column in upper.columns if any(upper[column] > threshold)] return [col for col in df.columns if col not in to_drop] def remove_low_variance(self, X, threshold=1e-6): selector = VarianceThreshold(threshold) selector.fit(X) return X.columns[selector.get_support()].tolist() def select_by_importance(self, X, y, top_n=30): model = RandomForestClassifier( n_estimators=200, max_depth=6, n_jobs=-1, random_state=42 ) model.fit(X, y) importances = pd.Series(model.feature_importances_, index=X.columns) importances = importances.sort_values(ascending=False) return importances.head(top_n).index.tolist() def stability_filter(self, X, y, splits=3): from sklearn.model_selection import TimeSeriesSplit tscv = TimeSeriesSplit(n_splits=splits) feature_scores = {col: [] for col in X.columns} for train_idx, val_idx in tscv.split(X): X_train, X_val = X.iloc[train_idx], X.iloc[val_idx] y_train, y_val = y.iloc[train_idx], y.iloc[val_idx] model = RandomForestClassifier(n_estimators=100, max_depth=5, n_jobs=-1) model.fit(X_train, y_train) for i, col in enumerate(X.columns): feature_scores[col].append(model.feature_importances_[i]) # moyenne importance stability = { col: np.mean(vals) for col, vals in feature_scores.items() } return sorted(stability, key=stability.get, reverse=True) # def transformData(self, df: pd.DataFrame) -> pd.DataFrame: # """ # Sélection des features + scaling automatique basé sur variance relative # """ # # ---- Étape 1 : sélection des features (exemple simplifié) ---- # # Ici tu mets ton pipeline actuel de variance / corrélation / importance # selected_features = df.columns.tolist() # remplacer par ton filtrage réel # df_selected = df[selected_features].copy() # # # ---- Étape 2 : scaling automatique ---- # epsilon = 1e-8 # variance_relative = (df_selected.std() ** 2) / (df_selected.mean().abs() + epsilon) # threshold = 1.0 # # self.features_to_scale = variance_relative[variance_relative > threshold].index.tolist() # self.features_no_scale = variance_relative[variance_relative <= threshold].index.tolist() # # # Appliquer StandardScaler uniquement sur les features à normaliser # self.scaler = StandardScaler() # df_selected[self.features_to_scale] = self.scaler.fit_transform(df_selected[self.features_to_scale]) # df_selected[self.features_no_scale] = df_selected[self.features_no_scale] # # # ---- Optionnel : print pour debug ---- # print("Features scalées :", self.features_to_scale) # print("Features non-scalées :", self.features_no_scale) # # return df_selected # # def transform_new_data(self, df_new: pd.DataFrame) -> pd.DataFrame: # """ # Appliquer le scaling sur de nouvelles données avec le scaler déjà entraîné # """ # df_new_scaled = df_new.copy() # if self.scaler is not None: # df_new_scaled[self.features_to_scale] = self.scaler.transform(df_new_scaled[self.features_to_scale]) # return df_new_scaled def generate_shap_analysis_class(self, model, X_valid, metadata): os.makedirs(self.path, exist_ok=True) pair = metadata["pair"].replace("/", "_") # ts = datetime.now().strftime("%Y%m%d_%H%M%S") def save_fig(name): filepath = f"{self.path}/{pair}_{name}.png" plt.savefig(filepath) plt.close() print(f"📊 Saved: {filepath}") # ========================= # 🔹 SAMPLE (perf) # ========================= sample_size = min(1000, len(X_valid)) X_sample = X_valid.sample(sample_size, random_state=42) # ========================= # 🔥 SHAP CALCULATION # ========================= explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_sample) print("SHAP type:", type(shap_values)) # ========================= # 🧠 MULTICLASS HANDLING # ========================= if isinstance(shap_values, list): # ancien format shap_list = shap_values elif len(shap_values.shape) == 3: # nouveau format : (samples, features, classes) shap_list = [shap_values[:, :, i] for i in range(shap_values.shape[2])] else: # binaire classique shap_list = [shap_values] print("SHAP shape:", getattr(shap_values, "shape", None)) print("SHAP type:", type(shap_values)) # ========================= # 📊 SHAP PAR CLASSE # ========================= for i, sv in enumerate(shap_list): shap.summary_plot(sv, X_sample, max_display=20, show=False) save_fig(f"shap_summary_class_{i}") for i, sv in enumerate(shap_list): feat_importance = np.mean(np.abs(sv), axis=0) # (n_features,) imp_df = pd.DataFrame({ "feature": X_sample.columns, "importance": feat_importance }).sort_values(by="importance", ascending=False) imp_df.to_csv(f"{self.path}/{pair}_shap_importance_class_{i}.csv", index=False) # # ========================= # # 🌍 SHAP GLOBAL (IMPORTANT) # # ========================= # shap_mean = np.mean([np.abs(sv) for sv in shap_values], axis=i) # # # # # for i, cls in enumerate(shap_list): # # # shap.summary_plot(cls, X_valid, show=False, plot_size=(12, 6)) # # # save_fig(f"shap_global") # # # # ========================= # # 📊 EXPORT CSV IMPORTANCE # # ========================= # feature_importance = np.mean(shap_mean, axis=i) # # imp_df = pd.DataFrame({ # "feature": X_sample.columns, # "importance": feature_importance # }).sort_values(by="importance", ascending=False) # # csv_path = f"{self.path}/{pair}_shap_importance.csv" # imp_df.to_csv(csv_path, index=False) # print(f"📁 Saved CSV: {csv_path}") def trainModel3(self, df, metadata): pair = self.getShortName(metadata['pair']) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) path = self.path # f"user_data/plots/{pair}/" os.makedirs(path, exist_ok=True) # 1️⃣ Colonnes utilisables features = self.listUsableColumns(df) target_col = "target" # 2️⃣ Créer la cible multiclass # Classe 0 : percent24 < -0.005 # Classe 1 : -0.005 <= percent24 <= 0.005 # Classe 2 : percent24 > 0.005 df['target'] = pd.cut( df['percent24'].shift(-12), bins=[-np.inf, -0.0025, 0.0025, np.inf], labels=[0, 1, 2] ) # Supprimer NaN générés par shift df = df.dropna(subset=['target']) features = self.select_features_pipeline_for_class(df) df['target'] = df['target'].astype(int) # Supprimer percent24 des features if 'percent24' in features: features.remove('percent24') # 3️⃣ Séparer X et y X = df[features] y = df['target'] print("DF shape:", df.shape) print("Columns:", features) print("Target distribution:") print(y.value_counts(normalize=True)) # 4️⃣ Split temporel train / valid split = int(len(df) * 0.8) X_train, X_valid = X.iloc[:split], X.iloc[split:] y_train, y_valid = y.iloc[:split], y.iloc[split:] # 5️⃣ SMOTE multiclass uniquement sur train smote = SMOTE(random_state=42) X_train_res, y_train_res = smote.fit_resample(X_train, y_train) # Nombre de classes num_classes = len(np.unique(y_train_res)) # ========================= # 🎯 OPTUNA OBJECTIVE # ========================= def objective(trial): params = { "objective": "multiclass", "metric": "multi_logloss", "num_class": num_classes, "boosting_type": "gbdt", "num_leaves": trial.suggest_int("num_leaves", 16, 128), "max_depth": trial.suggest_int("max_depth", 3, 10), "learning_rate": trial.suggest_float("learning_rate", 0.005, 0.1, log=True), "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0), "bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0), "bagging_freq": trial.suggest_int("bagging_freq", 1, 10), "min_child_samples": trial.suggest_int("min_child_samples", 5, 100), "lambda_l1": trial.suggest_float("lambda_l1", 1e-4, 10, log=True), "lambda_l2": trial.suggest_float("lambda_l2", 1e-4, 10, log=True), "verbose": -1, "seed": 42 } train_data = lgb.Dataset(X_train_res, y_train_res) valid_data = lgb.Dataset(X_valid, y_valid) model = lgb.train( params, train_data, num_boost_round=1000, valid_sets=[valid_data], callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)] ) # Probabilités proba = model.predict(X_valid) # shape = (n_samples, n_classes) preds = np.argmax(proba, axis=1) f1 = f1_score(y_valid, preds, average='macro') # multiclass return f1 # ========================= # 🚀 RUN OPTUNA # ========================= study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=10) best_params = study.best_params.copy() # ========================= # 🔥 TRAIN FINAL MODEL # ========================= final_params = { **best_params, "objective": "multiclass", "metric": "multi_logloss", "num_class": num_classes, "boosting_type": "gbdt", "verbose": -1, "seed": 42 } train_data = lgb.Dataset(X_train_res, y_train_res) self.train_model = lgb.train( final_params, train_data, num_boost_round=1000 ) # Probabilités pour chaque classe probs_all_classes = self.train_model.predict(X) # shape = (n_samples, n_classes) # Ajouter probabilité de chaque classe au dataframe pour analyse # for i in range(num_classes): # df[f'prob_class_{i}'] = probs_all_classes[:, i] self.features = features self.df = df # ========================= # 📊 EVALUATION MULTICLASS # ========================= proba = self.train_model.predict(X_valid) # shape = (n_samples, n_classes) preds = np.argmax(proba, axis=1) # Classe prédite print("===== 📊 RESULTS =====") print("F1:", f1_score(y_valid, preds, average='macro')) print("Precision:", precision_score(y_valid, preds, average='macro')) print("Recall:", recall_score(y_valid, preds, average='macro')) # ROC AUC multiclass try: roc = roc_auc_score(y_valid, proba, multi_class='ovr', average='macro') print("ROC AUC:", roc) except ValueError: print("ROC AUC cannot be computed (check y_valid and number of classes)") joblib.dump( {"model": self.train_model, # "threshold": best_threshold, "features": features}, f"{self.path}/{pair}_rf_model.pkl" ) print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl") # Génération de diagnostics pour multiclass proba = self.train_model.predict(X_valid) # shape = (n_samples, n_classes) preds = np.argmax(proba, axis=1) # labels prédits self.generate_diagnostics( model=self.train_model, X_valid=X_valid, y_valid=y_valid, df=df, # preds=preds, # passer les labels prédits # proba=proba, # passer les probabilités si besoin metadata=metadata ) self.generate_shap_analysis_class(model=self.train_model, X_valid=X_valid, metadata=metadata) self.extract_buy_rules_class(self.train_model, X_valid, y_valid) def select_features_pipeline_for_class(self, df): features = self.listUsableColumns(df) X = df[features] y = df['target'] print(f"Initial features: {len(features)}") # ========================= # 1️⃣ VARIANCE # ========================= var = X.var() X = X.loc[:, var > 1e-6] print(f"After variance: {X.shape[1]}") # ========================= # 2️⃣ CORRELATION # ========================= corr = X.corr().abs() upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool)) to_drop = [col for col in upper.columns if any(upper[col] > 0.90)] X = X.drop(columns=to_drop) print(f"After correlation: {X.shape[1]}") # ========================= # 3️⃣ LIGHTGBM IMPORTANCE # ========================= model = lgb.LGBMClassifier( objective='multiclass', num_class=len(y.unique()), n_estimators=200, random_state=42 ) model.fit(X, y) importance = pd.Series( model.feature_importances_, index=X.columns ).sort_values(ascending=False) print("Top 10 features:") print(importance.head(10)) # ⚠️ seuil dynamique (IMPORTANT) threshold = importance.mean() selected = importance[importance > threshold].index.tolist() print(f"After importance: {len(selected)}") return selected def extract_buy_rules_class(self, model, X_valid, y_valid): # ========================= # SAMPLE # ========================= X_sample = X_valid.copy() explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_sample) # ========================= # FORMAT SHAP # ========================= if isinstance(shap_values, list): shap_class = shap_values[2] # classe BUY elif len(shap_values.shape) == 3: shap_class = shap_values[:, :, 2] else: raise Exception("SHAP format inconnu") # ========================= # FOCUS SUR PREDICTIONS BUY # ========================= preds = model.predict(X_sample) buy_idx = np.where(preds == 2)[0] X_buy = X_sample.iloc[buy_idx] shap_buy = shap_class[buy_idx] print(f"BUY samples: {len(buy_idx)}") # ========================= # TOP FEATURES # ========================= mean_shap = np.mean(np.abs(shap_buy), axis=0) importance = pd.Series(mean_shap, index=X_sample.columns) importance = importance.sort_values(ascending=False) top_features = importance.head(10).index.tolist() print("Top BUY features:") print(top_features) # ========================= # EXTRACTION DE RÈGLES # ========================= rules = [] for feat in top_features: values = X_buy[feat] q_low = values.quantile(0.25) q_high = values.quantile(0.75) mean_val = values.mean() rules.append({ "feature": feat, "mean": mean_val, "q25": q_low, "q75": q_high }) rules_df = pd.DataFrame(rules) print("\n===== BUY RULES =====") print(rules_df) return rules_df