diff --git a/Zeus_LGBMRegressor.json b/Zeus_LGBMRegressor.json index a39bf8a..078e461 100644 --- a/Zeus_LGBMRegressor.json +++ b/Zeus_LGBMRegressor.json @@ -2,10 +2,10 @@ "strategy_name": "Zeus_LGBMRegressor", "params": { "roi": { - "0": 0.564, - "567": 0.273, - "2814": 0.12, - "7675": 0 + "0": 10 + }, + "stoploss": { + "stoploss": -1 }, "trailing": { "trailing_stop": true, @@ -17,20 +17,19 @@ "max_open_trades": 80 }, "buy": { - "mises": 5, - "mise_factor_buy": 0.01, + "mise_factor_buy": 0.02, + "mises": 2, "ml_prob_buy": -0.25, - "pct": 0.005, - "pct_inc": 0.0001 + "pct": 0.012, + "pct_inc": 0.001 }, "sell": { "ml_prob_sell": 0.07 }, - "protection": {}, - "stoploss": { - "stoploss": -0.333 + "protection": { + } }, "ft_stratparam_v": 1, - "export_time": "2025-11-11 19:39:29.538204+00:00" + "export_time": "2025-11-14 21:34:28.681343+00:00" } \ No newline at end of file diff --git a/Zeus_LGBMRegressor.py b/Zeus_LGBMRegressor.py index 266bdbc..7e97968 100644 --- a/Zeus_LGBMRegressor.py +++ b/Zeus_LGBMRegressor.py @@ -55,6 +55,16 @@ import optuna from lightgbm import LGBMRegressor from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression, Ridge, HuberRegressor +from sklearn.preprocessing import StandardScaler, PolynomialFeatures +from sklearn.pipeline import make_pipeline +from sklearn.svm import SVR +from sklearn.ensemble import RandomForestRegressor +from sklearn.ensemble import GradientBoostingRegressor +from sklearn.preprocessing import StandardScaler +from sklearn.ensemble import HistGradientBoostingRegressor +from sklearn.impute import SimpleImputer +from sklearn.pipeline import Pipeline # Couleurs ANSI de base RED = "\033[31m" @@ -66,7 +76,10 @@ CYAN = "\033[36m" RESET = "\033[0m" import warnings -warnings.filterwarnings("ignore", message="No further splits with positive gain") +warnings.filterwarnings( + "ignore", + message=r".*No further splits with positive gain.*" +) def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) @@ -81,29 +94,45 @@ class Zeus_LGBMRegressor(IStrategy): startup_candle_count = 24 # Machine Learning - model_indicators = [ - "ms-10", "ms-5", "ms-4", "ms-3", "ms-2", "ms-1", "ms-0", - 'rsi', 'rsi_deriv1', 'rsi_deriv2', "max_rsi_12", - "bb_percent", - 'vol_24', - 'percent3', - 'sma5_dist', 'sma5_deriv1', 'sma5_deriv2', - 'sma24_dist', 'sma24_deriv1', 'sma24_deriv2', - 'sma60_dist', 'sma60_deriv1', 'sma60_deriv2', - 'down_pct', 'slope_norm', - 'min_max_60', - 'rsi_slope', 'adx_change', 'volatility_ratio', - 'slope_ratio', 'bb_width', - 'rsi_1h', 'rsi_deriv1_1h', 'rsi_deriv2_1h', "max_rsi_12_1h", - ] + model_indicators = ['volume', 'open_count', 'high_count', 'low_count', + 'close_count', 'max_count', 'hapercent', 'mid', 'percent', 'percent3', + 'percent12', 'percent24', 'sma5', 'sma5_dist', 'sma5_deriv1', 'sma5_deriv2', 'sma12', + 'sma12_dist', 'sma12_deriv1', 'sma12_deriv2', 'sma24', 'sma24_dist', 'sma24_deriv1', + 'sma24_deriv2', 'sma48', 'sma48_dist', 'sma48_deriv1', 'sma48_deriv2', 'sma60', 'sma60_dist', + 'sma60_deriv1', 'sma60_deriv2', 'mid_smooth_3', 'mid_smooth_3_dist', 'mid_smooth_3_deriv1', + 'mid_smooth_3_deriv2', 'mid_smooth_5', 'mid_smooth_5_dist', 'mid_smooth_5_deriv1', + 'mid_smooth_5_deriv2', 'mid_smooth_12', 'mid_smooth_12_dist', 'mid_smooth_12_deriv1', + 'mid_smooth_12_deriv2', 'mid_smooth_24', 'mid_smooth_24_dist', 'mid_smooth_24_deriv1', + 'mid_smooth_24_deriv2', 'rsi', 'max_rsi_12', 'max_rsi_24', 'rsi_dist', 'rsi_deriv1', + 'rsi_deriv2', 'max12', 'min12', 'max60', 'min60', 'min_max_60', 'bb_lowerband', 'bb_middleband', + 'bb_upperband', 'bb_percent', 'bb_width', 'macd', 'macdsignal', 'macdhist', 'slope', + 'slope_smooth', 'atr', 'atr_norm', 'adx', 'obv', 'vol_24', 'down_count', 'up_count', 'down_pct', + 'up_pct', 'rsi_slope', 'adx_change', 'volatility_ratio', 'rsi_diff', 'slope_ratio', + 'volume_sma_deriv', 'volume_dist', 'volume_deriv1', 'volume_deriv2', 'slope_norm', + 'hapercent_1h', + 'mid_1h', 'percent_1h', 'percent3_1h', 'percent12_1h', 'percent24_1h', 'sma5_1h', + 'sma5_dist_1h', 'sma5_deriv1_1h', 'sma5_deriv2_1h', 'sma12_1h', 'sma12_dist_1h', 'sma12_deriv1_1h', + 'sma12_deriv2_1h', 'sma24_1h', 'sma24_dist_1h', 'sma24_deriv1_1h', 'sma24_deriv2_1h', + 'sma48_1h', 'sma48_dist_1h', 'sma48_deriv1_1h', 'sma48_deriv2_1h', 'sma60_1h', 'sma60_dist_1h', + 'sma60_deriv1_1h', 'sma60_deriv2_1h', 'mid_smooth_3_1h', 'mid_smooth_3_dist_1h', + 'mid_smooth_3_deriv1_1h', 'mid_smooth_3_deriv2_1h', 'mid_smooth_5_1h', 'mid_smooth_5_dist_1h', + 'mid_smooth_5_deriv1_1h', 'mid_smooth_5_deriv2_1h', 'mid_smooth_12_1h', 'mid_smooth_12_dist_1h', + 'mid_smooth_12_deriv1_1h', 'mid_smooth_12_deriv2_1h', 'mid_smooth_24_1h', + 'mid_smooth_24_dist_1h', 'mid_smooth_24_deriv1_1h', 'mid_smooth_24_deriv2_1h', 'rsi_1h', + 'max_rsi_12_1h', 'max_rsi_24_1h', 'rsi_dist_1h', 'rsi_deriv1_1h', 'rsi_deriv2_1h', 'max12_1h', + 'min12_1h', 'max60_1h', 'min60_1h', 'min_max_60_1h', 'bb_lowerband_1h', 'bb_middleband_1h', + 'bb_upperband_1h', 'bb_percent_1h', 'bb_width_1h', 'macd_1h', 'macdsignal_1h', 'macdhist_1h', + 'slope_1h', 'slope_smooth_1h', 'atr_1h', 'atr_norm_1h', 'adx_1h', 'obv_1h', 'vol_24_1h', + 'down_count_1h', 'up_count_1h', 'down_pct_1h', 'up_pct_1h', 'rsi_slope_1h', 'adx_change_1h', + 'volatility_ratio_1h', 'rsi_diff_1h', 'slope_ratio_1h', 'volume_sma_deriv_1h', 'volume_dist_1h', + 'volume_deriv1_1h', 'volume_deriv2_1h', 'slope_norm_1h', 'mid_future_pred_cons_1h', + 'mid_smooth_1h', 'mid_smooth_1h_deriv1', 'mid_smooth_1h_deriv2', 'mid_smooth_5h', + 'mid_smooth_5h_deriv1', 'mid_smooth_5h_deriv2', 'mid_future_pred_cons', + 'sma24_future_pred_cons'] + indicator_target = 'sma24_deriv1' model = None - # model_indicators = ["ms-10", "ms-5", "ms-4", "ms-3", "ms-2", "ms-1", "ms-0"] - # model_indicators = ['open', 'high', 'close', 'haclose', 'percent', 'sma5', 'sma12', 'sma24', 'sma24_deriv1', 'sma24_deriv2', 'sma48', 'sma48_deriv1', 'sma48_deriv2', 'sma60', 'sma60_dist', 'sma60_deriv1', - # 'sma60_deriv2', 'mid_smooth_3_deriv2', 'mid_smooth_12_deriv1', 'mid_smooth_12_deriv2', 'mid_smooth_24', 'mid_smooth_24_deriv1', 'mid_smooth_24_deriv2', 'max_rsi_12', 'max_rsi_24', 'max12', - # 'max60', 'min60', 'min_max_60', 'bb_lowerband', 'bb_upperband', 'bb_width', 'macd', 'macdsignal', 'macdhist', 'sma_20', 'sma_100', 'atr', 'atr_norm', 'adx', 'obv', 'vol_24', 'adx_change', - # 'volatility_ratio', 'slope_ratio', 'mid_smooth_1h_deriv2', 'mid_smooth_5h', 'mid_smooth_5h_deriv1', 'mid_smooth_5h_deriv2'] - + path = f"user_data/plots/" levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] # startup_candle_count = 12 * 24 * 5 @@ -205,23 +234,6 @@ class Zeus_LGBMRegressor(IStrategy): "color": "green" } }, - "States": { - "tdc_macd_1h": { - "color": "cyan" - }, - "sma24_state_1h": { - "color": "pink" - }, - "sma24_state": { - "color": "yellow" - }, - "sma5_state_1d": { - "color": "blue" - }, - "sma60_state": { - "color": "green" - } - }, 'Macd': { "macd_rel_1d": { "color": "cyan" @@ -298,35 +310,14 @@ class Zeus_LGBMRegressor(IStrategy): pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=True, load=True) pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True) - # deriv1_buy_protect = DecimalParameter(-0.3, 0.1, default=-0.1, decimals=2, space='protection', optimize=True, load=True) - # rsi_buy_protect = IntParameter(50, 90, default=70, space='protection', optimize=True, load=True) - # indic_5m_slope_sup = CategoricalParameter(indicators, default="sma60", space='protection') - # indic_1h_slope_sup = CategoricalParameter(indicators, default="sma5", space='protection') + rsi_deb_protect = IntParameter(50, 90, default=70, space='protection', optimize=True, load=True) + rsi_end_protect = IntParameter(20, 60, default=55, space='protection', optimize=True, load=True) - labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] - index_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] - ordered_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] - - label_to_index = {label: i for i, label in enumerate(ordered_labels)} + sma24_deriv1_deb_protect = DecimalParameter(-4, 4, default=-2, decimals=1, space='protection', optimize=True, load=True) + sma24_deriv1_end_protect = DecimalParameter(-4, 4, default=0, decimals=1, space='protection', optimize=True, load=True) # ========================================================================= - # paliers dérivées jour sma5 - sma5_deriv1 = [-1.1726, -0.2131, -0.1012, -0.0330, 0.0169, 0.0815, 0.2000, 4.0335] - sma5_deriv2 = [-1.9190, -0.1388, -0.0644, -0.0202, 0.0209, 0.0646, 0.1377, 4.2987] - sma5_derive1_2_matrice = { - 'B3': [8.6, 10.8, 34.6, 35.0, 58.8, 61.9, 91.2], - 'B2': [0.0, 12.5, 9.1, 57.1, 63.3, 79.3, 89.5], - 'B1': [6.1, 12.5, 22.0, 46.8, 61.5, 70.0, 100.0], - 'N0': [0.0, 10.7, 37.0, 43.5, 75.0, 75.9, 100.0], - 'H1': [0.0, 18.5, 32.4, 35.9, 76.8, 82.9, 92.0], - 'H2': [0.0, 21.9, 16.0, 39.5, 69.7, 83.3, 100.0], - 'H3': [9.5, 29.2, 41.2, 57.9, 53.8, 86.8, 92.3], - } - sma5_derive1_2_matrice_df = pd.DataFrame(sma5_derive1_2_matrice, index=index_labels) - # Extraction de la matrice numérique - sma5_derive1_2_numeric_matrice = sma5_derive1_2_matrice_df.reindex(index=ordered_labels, - columns=ordered_labels).values should_enter_trade_count = 0 def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, @@ -340,37 +331,10 @@ class Zeus_LGBMRegressor(IStrategy): last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() - # val = self.getProbaHausse144(last_candle) - - # trend = last_candle['trend_class'] - # params = self.loadParamsFor(pair, trend) - - # indic_5m = self.getParamValue(pair, trend, 'buy', 'indic_5m') - # indic_deriv1_5m = self.getParamValue( pair, trend, 'buy', 'indic_deriv1_5m') - # indic_deriv2_5m = self.getParamValue( pair, trend, 'buy', 'indic_deriv2_5m') condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) - # allow_to_buy = True #(not self.stop_all) #& (not self.all_down) - # and val > self.buy_val.value #not last_candle['tendency'] in (-1, -2) # (rate <= float(limit)) | (entry_tag == 'force_entry') - allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') - - # if allow_to_buy: - # poly_func, x_future, y_future, count = self.polynomial_forecast( - # dataframe['mid_smooth_12'], - # window=self.buy_horizon_predict_1h.value * 12, - # degree=4, - # n_future=3) - # - # if count < 3: - # allow_to_buy = False - force = self.pairs[pair]['force_buy'] - if self.pairs[pair]['force_buy']: - self.pairs[pair]['force_buy'] = False - allow_to_buy = True - else: - if not self.should_enter_trade(pair, last_candle, current_time): - allow_to_buy = False + allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') if allow_to_buy: self.trades = list() @@ -413,11 +377,10 @@ class Zeus_LGBMRegressor(IStrategy): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() + minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0)) profit =trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] - allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') - - minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) + allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') if allow_to_sell: self.trades = list() @@ -449,7 +412,7 @@ class Zeus_LGBMRegressor(IStrategy): self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None # else: - # print(f"STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning") + # self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, @@ -499,17 +462,11 @@ class Zeus_LGBMRegressor(IStrategy): days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 - # trend = last_candle['trend_class'] - # - # indic_5m_sell = self.getParamValue( pair, trend, 'sell', 'indic_5m_sell') - # indic_deriv1_5m_sell = self.getParamValue( pair, trend, 'sell', 'indic_deriv1_5m_sell') - # indic_deriv2_5m_sell = self.getParamValue( pair, trend, 'sell', 'indic_deriv2_5m_sell') - if hours % 4 == 0: self.log_trade( last_candle=last_candle, date=current_time, - action="🔴 CURRENT" if self.pairs[pair]['stop'] else "🟢 CURRENT", + action="🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying_1h'] else "🟢 CURRENT", dispo=dispo, pair=pair, rate=last_candle['close'], @@ -519,41 +476,7 @@ class Zeus_LGBMRegressor(IStrategy): stake=0 ) - # if (last_candle['mid_smooth_deriv1'] >= 0): - # return None - # if (last_candle['tendency'] in (2, 1)) and (last_candle['rsi'] < 80): - # return None - # - # if (last_candle['sma24_deriv1'] < 0 and before_last_candle['sma24_deriv1'] >= 0) and (current_profit > expected_profit): - # return 'Drv_' + str(count_of_buys) pair_name = self.getShortName(pair) - # if (current_profit > expected_profit) and last_candle['can_sell']: - # return 'Can_' + pair_name + '_' + str(count_of_buys) - # trend = last_candle['trend_class_1d'] - # if (trend == "B-" or trend == "B--") and self.pairs[pair]['has_gain'] == 0: # and (last_candle[f"{indic_5m_sell}_deriv1"] <= indic_deriv1_5m_sell and last_candle[f"{indic_5m_sell}_deriv2"] <= indic_deriv2_5m_sell): - # - # if (last_candle['max_rsi_12_1h'] > 75) and last_candle['trend_class_1h'] == 1 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0): - # self.pairs[pair]['stop'] = True - # self.log_trade( - # last_candle=last_candle, - # date=current_time, - # action="🔴STOP", - # dispo=dispo, - # pair=pair, - # rate=last_candle['close'], - # trade_type='', - # profit=self.pairs[pair]['current_profit'], - # buys=self.pairs[pair]['count_of_buys'], - # stake=0 - # ) - # return "MAX_RSI" - # - # return None - - # if (trend == "B-" or trend == "B--") and last_candle[f"{self.indic_5m_sell.value}_deriv1"] <= self.indic_deriv1_5m_sell.value \ - # and last_candle[f"{self.indic_5m_sell.value}_deriv2"] <= self.indic_deriv2_5m_sell.value: - # return None - if last_candle['max_rsi_24'] > 85 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0.05: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) @@ -564,15 +487,15 @@ class Zeus_LGBMRegressor(IStrategy): self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Frc_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) - if profit > max(5, expected_profit) and baisse > 0.30: + if profit > 0 and baisse > 0.30: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) - if max_profit > 0.5 * count_of_buys and baisse > 0.15 and last_candle['sma12_state'] <= 0 and last_candle['sma60_state'] <= - 1: - self.pairs[pair]['force_sell'] = False - self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) - return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) + # if max_profit > 0.5 * count_of_buys and baisse > 0.15: + # self.pairs[pair]['force_sell'] = False + # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) + # return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if (last_candle['sma5_1h'] - before_last_candle_12['sma5_1h']) / last_candle['sma5_1h'] > 0.0002: return None @@ -599,7 +522,7 @@ class Zeus_LGBMRegressor(IStrategy): def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() - informative_pairs = [(pair, '1d') for pair in pairs] + # informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs @@ -678,34 +601,18 @@ class Zeus_LGBMRegressor(IStrategy): if buys is None: buys = '' - max_touch = '' # round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1) + max_touch = '' pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) - dist_max = self.getDistMax(last_candle, pair) - - # if trade_type is not None: - # if np.isnan(last_candle['rsi_1d']): - # string = ' ' - # else: - # string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_deriv1_1d'])) - # trade_type = trade_type \ - # + " " + string \ - # + " " + str(int(last_candle['rsi_1h'])) \ - # + " " + str(int(last_candle['rsi_deriv1_1h'])) - - # val144 = self.getProbaHausse144(last_candle) - # val1h = self.getProbaHausse1h(last_candle) - val = self.getProbaHausseSma5d(last_candle) - - pct60 = round(100 * self.getPct60D(pair, last_candle), 2) + dist_max = '' color = GREEN if profit > 0 else RED - color_sma24 = GREEN if last_candle['sma24_deriv1_1d'] > 0 else RED - color_sma24_2 = GREEN if last_candle['sma24_deriv2_1d'] > 0 else RED - color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1d'] > 0 else RED - color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1d'] > 0 else RED + color_sma24 = GREEN if last_candle['sma24_deriv1_1h'] > 0 else RED + color_sma24_2 = GREEN if last_candle['sma24_deriv2_1h'] > 0 else RED + color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1h'] > 0 else RED + color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1h'] > 0 else RED color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED @@ -722,17 +629,6 @@ class Zeus_LGBMRegressor(IStrategy): # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. - # trend = last_candle['trend_class_1d'] - # - # indic_5m = self.getParamValue(pair, trend, 'buy', 'indic_5m') - # indic_deriv1_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv1_5m') - # indic_deriv2_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv2_5m') - # - # indic_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_5m_sell') - # indic_deriv1_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv1_5m_sell') - # indic_deriv2_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv2_5m_sell') - - self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " @@ -740,25 +636,16 @@ class Zeus_LGBMRegressor(IStrategy): # f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_1h_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|" # f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_1h_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|" f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|" - f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1_1d'], 2):>5}{RESET}" - f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1_1d'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2_1d'], 2):>5}{RESET}" + f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1_1h'], 2):>5}{RESET}" + f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1_1h'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2_1h'], 2):>5}{RESET}" f"|{color_sma5_1h}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}" f"|{color_smooth_1h}{round(last_candle['mid_smooth_1h_deriv1'], 2):>5}{RESET}|{color_smooth2_1h}{round(last_candle['mid_smooth_1h_deriv2'], 2):>5}{RESET}" - # f"|{last_candle['min60_1d']}|{last_candle['max60_1d']}" - # f"|{last_candle['mid_smooth_tdc_5_1d'] or '-':>3}|{last_candle['mid_smooth_tdc_5_1h'] or '-':>3}|{last_candle['mid_smooth_tdc_5'] or '-':>3}" - f"|{last_candle['mid_smooth_5_state_1d'] or '-':>3}|{last_candle['mid_smooth_24_state_1h'] or '-':>3}|{last_candle['mid_smooth_5_state_1h'] or '-':>3}|{last_candle['mid_smooth_5_state'] or '-':>3}" - f"|{last_candle['trend_class_1d']:>5} {last_candle['trend_class_1h']:>5}" # {indic_5m} {indic_deriv1_5m} {indic_deriv2_5m} {indic_5m_sell} {indic_deriv1_5m_sell} {indic_deriv2_5m_sell}" ) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost - def getDistMax(self, last_candle, pair): - mx = last_candle['max12_1d'] - dist_max = round(100 * (mx - last_candle['close']) / mx, 0) - return dist_max - def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|" self.printLog( @@ -824,18 +711,42 @@ class Zeus_LGBMRegressor(IStrategy): def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] + short_pair = self.getShortName(pair) + self.path = f"user_data/plots/{short_pair}/" dataframe = self.populateDataframe(dataframe, timeframe='5m') ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") informative = self.populateDataframe(informative, timeframe='1h') + informative = self.calculateRegression(informative, 'mid', lookback=5) + + informative['stop_buying_deb'] = ((informative['max_rsi_24'] > self.rsi_deb_protect.value) + & (informative['sma24_deriv1'] < self.sma24_deriv1_deb_protect.value) + ) + informative['stop_buying_end'] = ((informative['max_rsi_24'] < self.rsi_end_protect.value) + & (informative['sma24_deriv1'] > self.sma24_deriv1_end_protect.value) + ) + + latched = np.zeros(len(informative), dtype=bool) + + for i in range(1, len(informative)): + if informative['stop_buying_deb'].iloc[i]: + latched[i] = True + elif informative['stop_buying_end'].iloc[i]: + latched[i] = False + else: + latched[i] = latched[i - 1] + + informative['stop_buying'] = latched + dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) - ################### INFORMATIVE 1d - informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") - informative = self.populateDataframe(informative, timeframe='1d') - dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) + # ################### INFORMATIVE 1d + # informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") + # informative = self.populateDataframe(informative, timeframe='1d') + # # informative = self.calculateRegression(informative, 'mid', lookback=15) + # dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] @@ -881,33 +792,28 @@ class Zeus_LGBMRegressor(IStrategy): dataframe['mid_smooth_5h'] dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean() - # indic_5m_protect = self.indic_5m_slope_sup.value - # indic_1h_protect = self.indic_1h_slope_sup.value + '_1h' - # - # dataframe['stop_buying_deb'] = ((dataframe['max_rsi_12_1d'] > self.rsi_buy_protect.value) | (dataframe['sma24_deriv1_1h'] < self.deriv1_buy_protect.value)) & (qtpylib.crossed_below(dataframe[indic_5m_protect], dataframe[indic_1h_protect])) - # dataframe['stop_buying_end'] = (dataframe[indic_1h_protect].shift(24) > dataframe[indic_1h_protect].shift(12)) & (dataframe[indic_1h_protect].shift(12) < dataframe[indic_1h_protect]) + dataframe = self.calculateRegression(dataframe, 'mid', lookback=10, future_steps=10, model_type="poly") + dataframe = self.calculateRegression(dataframe, 'sma24', lookback=12, future_steps=12) - # latched = np.zeros(len(dataframe), dtype=bool) - # - # for i in range(1, len(dataframe)): - # if dataframe['stop_buying_deb'].iloc[i]: - # latched[i] = True - # elif dataframe['stop_buying_end'].iloc[i]: - # latched[i] = False - # else: - # latched[i] = latched[i - 1] - # - # dataframe['stop_buying'] = latched - - dataframe["ms-10"] = dataframe["mid_smooth_24_deriv1"].shift(10) - dataframe["ms-5"] = dataframe["mid_smooth_24_deriv1"].shift(5) - dataframe["ms-4"] = dataframe["mid_smooth_24_deriv1"].shift(4) - dataframe["ms-3"] = dataframe["mid_smooth_24_deriv1"].shift(3) - dataframe["ms-2"] = dataframe["mid_smooth_24_deriv1"].shift(2) - dataframe["ms-1"] = dataframe["mid_smooth_24_deriv1"].shift(1) - dataframe["ms-0"] = dataframe["mid_smooth_24_deriv1"] + # dataframe["ms-10"] = dataframe[self.indicator_target].shift(10) + # dataframe["ms-5"] = dataframe[self.indicator_target].shift(5) + # dataframe["ms-4"] = dataframe[self.indicator_target].shift(4) + # dataframe["ms-3"] = dataframe[self.indicator_target].shift(3) + # dataframe["ms-2"] = dataframe[self.indicator_target].shift(2) + # dataframe["ms-1"] = dataframe[self.indicator_target].shift(1) + # dataframe["ms-0"] = dataframe[self.indicator_target] # dataframe["ms+10"] = dataframe["mid_smooth_24"].shift(-11) + self.model_indicators = self.listUsableColumns(dataframe) + + # Quantile + self.add_future_quantiles( + dataframe, + indic="mid", + lookback=40, + future_steps=5 + ) + if self.dp.runmode.value in ('backtest'): self.trainModel(dataframe, metadata) @@ -933,25 +839,8 @@ class Zeus_LGBMRegressor(IStrategy): pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) - path=f"user_data/plots/{pair}/" - os.makedirs(path, exist_ok=True) - # # Étape 1 : sélectionner numériques - # numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns - # - # # Étape 2 : enlever constantes - # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 - # and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d") - # and not c.endswith("_class") and not c.endswith("_price") - # and not c.startswith('stop_buying'))] - # - # # Étape 3 : remplacer inf et NaN par 0 - # dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) - # - # print("Colonnes utilisables pour le modèle :") - # print(usable_cols) - # - # self.model_indicators = usable_cols + os.makedirs(self.path, exist_ok=True) df = dataframe[self.model_indicators].copy() @@ -962,7 +851,7 @@ class Zeus_LGBMRegressor(IStrategy): # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies # df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int) - df['target'] = dataframe["mid_smooth_24_deriv1"].shift(-11) # > df['sma24'] * 1.003).astype(int) + df['target'] = dataframe[self.indicator_target].shift(-24) # > df['sma24'] * 1.003).astype(int) df['target'] = df['target'].fillna(0) #.astype(int) # Corrélations triées par importance avec une colonne cible @@ -993,7 +882,7 @@ class Zeus_LGBMRegressor(IStrategy): mask = np.triu(np.ones_like(corr, dtype=bool)) # --- Création de la figure --- - fig, ax = plt.subplots(figsize=(20,12)) #96, 36)) + fig, ax = plt.subplots(figsize=(96, 36)) # --- Heatmap avec un effet “température” --- sns.heatmap( @@ -1014,7 +903,7 @@ class Zeus_LGBMRegressor(IStrategy): plt.yticks(rotation=0) # --- Sauvegarde --- - output_path = f"{path}/Matrice_de_correlation_temperature.png" + output_path = f"{self.path}/Matrice_de_correlation_temperature.png" plt.savefig(output_path, bbox_inches="tight", dpi=150) plt.close(fig) @@ -1033,6 +922,9 @@ class Zeus_LGBMRegressor(IStrategy): # Séparation temporelle (train = 80 %, valid = 20 %) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) + print("NaN per column:") + print(X_train.isna().sum().sort_values(ascending=False).head(20)) + # Nettoyage des valeurs invalides selector = VarianceThreshold(threshold=0.0001) @@ -1054,7 +946,7 @@ class Zeus_LGBMRegressor(IStrategy): # ) # train_model.fit(X_train, y_train) - train_model, selected_features = self.optuna(path, X_train, X_test, y_train, y_test) + train_model, selected_features = self.optuna(self.path, X_train, X_test, y_train, y_test) print("Features retenues :", list(selected_features)) # # 2️⃣ Sélection des features AVANT calibration @@ -1089,8 +981,37 @@ class Zeus_LGBMRegressor(IStrategy): joblib.dump(train_model, f"{pair}_rf_model.pkl") print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl") + # # Quantile + # dataframe = self.add_future_quantiles( + # df, + # indic="mid", + # lookback=40, + # future_steps=5 + # ) + self.analyze_model(pair, train_model, X_train, X_test, y_train, y_test) + def listUsableColumns(self, dataframe): + # Étape 1 : sélectionner numériques + numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns + # Étape 2 : enlever constantes + usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 + and not c.endswith("_state") and not c.endswith("_1d") and not c.endswith("_1h") + and not c.startswith("open") and not c.startswith("close") + and not c.startswith("low") and not c.startswith("high") + and not c.startswith("haopen") and not c.startswith("haclose") + and not c.startswith("bb_lower") and not c.startswith("bb_upper") + and not c.startswith("bb_middle") + # not c.endswith("_1h") + and not c.endswith("_class") and not c.endswith("_price") + and not c.startswith('stop_buying')] + # Étape 3 : remplacer inf et NaN par 0 + dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) + print("Colonnes utilisables pour le modèle :") + print(usable_cols) + self.model_indicators = usable_cols + return self.model_indicators + def inspect_model(self, model): """ Affiche les informations d'un modèle ML déjà entraîné. @@ -1179,6 +1100,8 @@ class Zeus_LGBMRegressor(IStrategy): "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(importance) + top_n = 20 + importance = importance.head(top_n) # Crée une figure plus grande fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces @@ -1337,9 +1260,7 @@ class Zeus_LGBMRegressor(IStrategy): # Le graphique généré affichera trois courbes : # # 🔵 Precision — la fiabilité de tes signaux haussiers. - # # 🟢 Recall — la proportion de hausses que ton modèle détecte. - # # 🟣 F1-score — le compromis optimal entre les deux. thresholds = np.arange(0, 1.01, step) @@ -1387,6 +1308,7 @@ class Zeus_LGBMRegressor(IStrategy): def populateDataframe(self, dataframe, timeframe='5m'): + dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] @@ -1422,6 +1344,7 @@ class Zeus_LGBMRegressor(IStrategy): dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) self.calculeDerivees(dataframe, 'rsi', timeframe=timeframe, ema_period=12) dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12) + dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12) dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60) dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60) dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['close']) / dataframe['min60']) @@ -1480,44 +1403,12 @@ class Zeus_LGBMRegressor(IStrategy): # hist_col='macdhist_rel' # ) - # ------------------------------------------------------------------------------------ - # rolling SMA indicators (used for trend detection too) - s_short = self.DEFAULT_PARAMS['sma_short'] - s_long = self.DEFAULT_PARAMS['sma_long'] - - dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean() - dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean() - # --- pente brute --- dataframe['slope'] = dataframe['sma24'].diff() # --- lissage EMA --- dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean() - # # RSI - # window = 14 - # delta = dataframe['close'].diff() - # up = delta.clip(lower=0) - # down = -1 * delta.clip(upper=0) - # ma_up = up.rolling(window=window).mean() - # ma_down = down.rolling(window=window).mean() - # rs = ma_up / ma_down.replace(0, 1e-9) - # dataframe['rsi'] = 100 - (100 / (1 + rs)) - # - # # EMA example - # dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean() - # - # # ATR (simple implementation) - # high_low = dataframe['high'] - dataframe['low'] - # high_close = (dataframe['high'] - dataframe['close'].shift()).abs() - # low_close = (dataframe['low'] - dataframe['close'].shift()).abs() - # tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1) - # dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean() - - ########################### - # df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume - # Assure-toi qu'il est trié par date croissante - # --- Volatilité normalisée --- dataframe['atr'] = ta.volatility.AverageTrueRange( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 @@ -1658,10 +1549,6 @@ class Zeus_LGBMRegressor(IStrategy): d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" - # d1s_col = f"{name}{suffixe}_deriv1_smooth" - # d2s_col = f"{name}{suffixe}_deriv2_smooth" - tendency_col = f"{name}{suffixe}_state" - factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) @@ -1670,84 +1557,9 @@ class Zeus_LGBMRegressor(IStrategy): # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"] - # dérivée relative simple - dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) - # lissage EMA - dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean() - - # dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median() - + dataframe[d1_col] = 1000 * (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1) - dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean() - - # epsilon adaptatif via rolling percentile - p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05) - p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95) - p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05) - p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95) - - eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef - eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef - - # fallback global eps - global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef - global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef - - eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1) - eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2) - - # if verbose and self.dp.runmode.value in ('backtest'): - # stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T - # stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0) - # print(f"---- Derivatives stats {timeframe}----") - # print(stats) - # print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}") - # print("---------------------------") - - # mapping tendency - def tag_by_derivatives(row): - idx = int(row.name) - d1v = float(row[d1_col]) - d2v = float(row[d2_col]) - eps1 = float(eps_d1_series.iloc[idx]) - eps2 = float(eps_d2_series.iloc[idx]) - - # # mapping état → codes 3 lettres explicites - # # | Ancien état | Nouveau code 3 lettres | Interprétation | - # # | ----------- | ---------------------- | --------------------- | - # # | 4 | HAU | Hausse Accélérée | - # # | 3 | HSR | Hausse Ralentissement | - # # | 2 | HST | Hausse Stable | - # # | 1 | DHB | Départ Hausse | - # # | 0 | PAL | Palier / neutre | - # # | -1 | DBD | Départ Baisse | - # # | -2 | BSR | Baisse Ralentissement | - # # | -3 | BST | Baisse Stable | - # # | -4 | BAS | Baisse Accélérée | - - # Palier strict - if abs(d1v) <= eps1 and abs(d2v) <= eps2: - return 0 - # Départ si d1 ~ 0 mais d2 signale direction - if abs(d1v) <= eps1: - return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0 - # Hausse - if d1v > eps1: - return 4 if d2v > eps2 else 3 - # Baisse - if d1v < -eps1: - return -4 if d2v < -eps2 else -2 - return 0 - - dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1) - - # if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'): - # print("##################") - # print(f"# STAT {timeframe} {name}{suffixe}") - # print("##################") - # self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2") - return dataframe def getOpenTrades(self): @@ -1755,19 +1567,6 @@ class Zeus_LGBMRegressor(IStrategy): self.trades = Trade.get_open_trades() return self.trades - def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: - dataframe.loc[ - ( - (dataframe['ml_prob'] > dataframe['sma24_deriv1']) - ), ['enter_long', 'enter_tag']] = (1, f"ml_prob") - - dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) - - if self.dp.runmode.value in ('backtest'): - dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather") - - return dataframe - def calculateProbabilite2Index(self, df, futur_cols, indic_1, indic_2): # # Définition des tranches pour les dérivées # bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf] @@ -1911,23 +1710,83 @@ class Zeus_LGBMRegressor(IStrategy): line += f" {color}{v:5.1f}{RESET} " print(line) - def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe.loc[ ( - (dataframe['ml_prob'] < dataframe['sma24_deriv1']) - ), ['exit_long', 'exit_tag']] = (1, f"ml_prob") + ( + ( + (dataframe['mid_future_pred_cons'].shift(2) > dataframe['mid_future_pred_cons'].shift(1)) + & (dataframe['mid_future_pred_cons'].shift(1) < dataframe['mid_future_pred_cons']) + & (dataframe['percent12'] < -0.0005) + ) + | ( + (dataframe['mid_future_pred_cons'] < dataframe['min12']) + ) + ) + & + ( + ((dataframe['mid_smooth_12_deriv1'] > 0) | (dataframe['mid_smooth_5_deriv1'] > 0)) + ) + + ), ['enter_long', 'enter_tag']] = (1, f"future") + + dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) + + if self.dp.runmode.value in ('backtest'): + dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather") + + return dataframe + + def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + + # dataframe.loc[ + # ( + # ( + # ( + # (dataframe['ml_prob'].shift(2) < dataframe['ml_prob'].shift(1)) + # & (dataframe['ml_prob'].shift(1) > dataframe['ml_prob']) + # ) + # | (dataframe['ml_prob'] < 0) + # ) + # & (dataframe['hapercent'] < 0) + # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") + + # dataframe.loc[ + # ( + # ( + # ( + # (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1)) + # & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons']) + # ) + # # | (dataframe['mid_smooth_12_deriv1'] < 0) + # ) + # & (dataframe['sma60_future_pred_cons'] < dataframe['sma60_future_pred_cons'].shift(1)) + # & (dataframe['hapercent'] < 0) + # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") + + # + # dataframe.loc[ + # ( + # ( + # (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1)) + # & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons']) + # + # ) + # # & (dataframe['mid_future_pred_cons'] > dataframe['max12']) + # & (dataframe['hapercent'] < 0) + # + # ), ['exit_long', 'exit_tag']] = (1, f"max12") return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): - return None # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None - if (self.wallets.get_available_stake_amount() < 0): # or trade.stake_amount >= max_stake: + if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) @@ -1966,14 +1825,9 @@ class Zeus_LGBMRegressor(IStrategy): if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2: lim = - pct - (count_of_buys * self.pct_inc.value) - # lim = self.getLimitBuy(pair, last_candle, pct) - # lim = - (0.012 * (1 + round(count_of_buys / 5)) + 0.001 * (count_of_buys - 1)) - # lim = - (0.012 + 0.001 * (count_of_buys - 1) + (0.002 * count_of_buys if count_of_buys > 10 else 0.001 * count_of_buys if count_of_buys > 5 else 0)) - else: pct = 0.05 lim = - pct - (count_of_buys * 0.0025) - # lim = self.getLimitBuy(pair, last_candle, pct) if (len(dataframe) < 1): # print("skip dataframe") @@ -1982,9 +1836,10 @@ class Zeus_LGBMRegressor(IStrategy): if not self.should_enter_trade(pair, last_candle, current_time): return None - condition = (last_candle['enter_long'] and last_candle['sma60_deriv1'] > 0 and last_candle['hapercent'] > 0) \ - or last_candle['enter_tag'] == 'pct3' \ - or last_candle['enter_tag'] == 'pct3_1h' + condition = (last_candle['enter_long'] and last_candle['stop_buying_1h'] == False and last_candle['hapercent'] > 0) + # and last_candle['sma60_deriv1'] > 0 + # or last_candle['enter_tag'] == 'pct3' \ + # or last_candle['enter_tag'] == 'pct3_1h' # if (self.getShortName(pair) != 'BTC' and count_of_buys > 3): # condition = before_last_candle_24['mid_smooth_3_1h'] > before_last_candle_12['mid_smooth_3_1h'] and before_last_candle_12['mid_smooth_3_1h'] < last_candle['mid_smooth_3_1h'] #and last_candle['mid_smooth_3_deriv1_1h'] < -1.5 @@ -1997,28 +1852,12 @@ class Zeus_LGBMRegressor(IStrategy): self.pairs[pair]['force_sell'] = True return None - # if 6 <= count_of_buys: - # if not ((before_last_candle_24['sma24_deriv1_1h'] > before_last_candle_12['sma24_deriv1_1h']) - # & (before_last_candle_12['sma24_deriv1_1h'] < last_candle['sma24_deriv1_1h'])): - # return None - # print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_first={pct_first:.3f} pct_max={pct_max:.3f} lim={lim:.3f} index={index}") - # self.pairs[trade.pair]['last_palier_index'] = index - - # # Appel de la fonction - # poly_func, x_future, y_future, count = self.polynomial_forecast( - # dataframe['mid_smooth_12'], - # window=self.buy_horizon_predict_1h.value * 12, - # degree=4) - # - # if count < 3: - # return None - max_amount = self.config.get('stake_amount') * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle) * abs(last_lost / self.mise_factor_buy.value)) if stake_amount > 0: - trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' + trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( @@ -2055,7 +1894,7 @@ class Zeus_LGBMRegressor(IStrategy): if (profit > self.pairs[pair]['previous_profit'] and profit > self.pairs[pair]['expected_profit'] and hours > 6 # and last_candle['sma60_deriv1'] > 0 and last_candle['max_rsi_12_1h'] < 75 - and last_candle['rsi_1d'] < 58 + # and last_candle['rsi_1d'] < 58 # and last_candle['stop_buying'] == False # and last_candle['mid_smooth_5_deriv1_1d'] > 0 and self.wallets.get_available_stake_amount() > 0 @@ -2066,7 +1905,7 @@ class Zeus_LGBMRegressor(IStrategy): if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 - trade_type = 'Gain +' + trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( @@ -2098,140 +1937,27 @@ class Zeus_LGBMRegressor(IStrategy): def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) - def getPct60D(self, pair, last_candle): - return round((last_candle['max60_1d'] - last_candle['min60_1d']) / last_candle['max60_1d'], 4) - - def getPctClose60D(self, pair, last_candle): - if last_candle['close'] > last_candle['max12_1d']: - return 1 - if last_candle['close'] < last_candle['min12_1d']: - return 0 - return round( - (last_candle['close'] - last_candle['min12_1d']) / (last_candle['max12_1d'] - last_candle['min12_1d']), 4) - - def getLimitBuy(self, pair, last_candle, first_pct): - count_of_buys = self.pairs[pair]['count_of_buys'] - pct60 = self.getPct60D(pair, last_candle) # exemple 0.3 pour 30% - if (pct60 < 0.05): - lim = - first_pct - (count_of_buys * 0.001 * 0.05 / 0.05) - else: - # 0.1 - # 0.4 - lim = - first_pct - (count_of_buys * 0.001 * pct60 / 0.05) - - return lim - - # def getProbaHausseEmaVolume(self, last_candle): - # value_1 = self.getValuesFromTable(self.ema_volume, last_candle['ema_volume']) - # value_2 = self.getValuesFromTable(self.mid_smooth_1h_deriv1, last_candle['mid_smooth_1h_deriv1']) - # - # val = self.approx_val_from_bins( - # matrice=self.ema_volume_mid_smooth_1h_deriv1_matrice_df, - # numeric_matrice=self.ema_volume_mid_smooth_1h_deriv1_numeric_matrice, - # row_label=value_2, - # col_label=value_1 - # ) - # return val - - def getProbaHausseSma5d(self, last_candle): - value_1 = self.getValuesFromTable(self.sma5_deriv1, last_candle['sma5_deriv1_1d']) - value_2 = self.getValuesFromTable(self.sma5_deriv2, last_candle['sma5_deriv2_1d']) - - # print(f"{last_candle['sma5_deriv1_1d']} => {value_1} / {last_candle['sma5_deriv2_1d']} => {value_2}") - - val = self.approx_val_from_bins( - matrice=self.sma5_derive1_2_matrice_df, - numeric_matrice=self.sma5_derive1_2_numeric_matrice, - row_label=value_2, - col_label=value_1 - ) - return val - def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # Calculer le minimum des 14 derniers jours nb_pairs = len(self.dp.current_whitelist()) base_stake_amount = self.config.get('stake_amount') / (self.mises.value) # * nb_pairs) # Montant de base configuré - # pct60 = round(100 * self.getPctClose60D(pair, last_candle), 2) - if True: # not pair in ('BTC/USDT', 'BTC/USDC'): - # factors = [1, 1.2, 1.3, 1.4] - if self.pairs[pair]['count_of_buys'] == 0: - # pctClose60 = self.getPctClose60D(pair, last_candle) - # dist_max = self.getDistMax(last_candle, pair) - # factor = self.multi_step_interpolate(dist_max, self.thresholds, self.factors) - factor = 1 #65 / min(65, last_candle['rsi_1d']) - if last_candle['slope_norm_1d'] < last_candle['slope_norm_1h']: - factor = 2 + # factors = [1, 1.2, 1.3, 1.4] + if self.pairs[pair]['count_of_buys'] == 0: + factor = 1 #65 / min(65, last_candle['rsi_1d']) + if last_candle['open'] < last_candle['sma5_1h'] and last_candle['mid_smooth_12_deriv1'] > 0: + factor = 2 - adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) - else: - adjusted_stake_amount = self.pairs[pair]['first_amount'] + adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) else: - first_price = self.pairs[pair]['first_buy'] - if (first_price == 0): - first_price = last_candle['close'] - - last_max = last_candle['max12_1d'] - pct = 5 - if last_max > 0: - pct = 100 * (last_max - first_price) / last_max - - factor = self.multi_step_interpolate(pct, self.thresholds, self.factors) - adjusted_stake_amount = base_stake_amount * factor # max(base_stake_amount, min(100, base_stake_amount * percent_4)) - - # pct = 100 * abs(self.getPctFirstBuy(pair, last_candle)) - # - # factor = self.multi_step_interpolate(pct, self.thresholds, self.factors) + adjusted_stake_amount = self.pairs[pair]['first_amount'] if self.pairs[pair]['count_of_buys'] == 0: self.pairs[pair]['first_amount'] = adjusted_stake_amount return adjusted_stake_amount - def calculateAmountSliding(self, pair, last_candle): - val = last_candle['close'] - min_sliding = min(last_candle['min60_1d'], val) - max_sliding = max(last_candle['max60_1d'], val) - min_abs = self.pairs[pair]['last_min'] - max_abs = self.pairs[pair]['last_max'] - full = self.wallets.get_total_stake_amount() - stake = full / self.stakes - - out_min = stake / 2 - out_max = stake * 2 - # Clamp sliding range within absolute bounds - min_sliding = max(min_sliding, min_abs) - max_sliding = min(max_sliding, max_abs) - - # Avoid division by zero - if max_sliding == min_sliding: - return out_max # Or midpoint, or default value - - # Inverse linear interpolation - position = (val - min_sliding) / (max_sliding - min_sliding) - return out_max - position * (out_max - out_min) - - def calculatePctSliding(self, pair, last_candle): - val = last_candle['close'] - min_sliding = last_candle['min60_1d'] - max_sliding = last_candle['max60_1d'] - min_abs = self.pairs[pair]['last_min'] - max_abs = self.pairs[pair]['last_max'] - out_min = 0.025 - out_max = 0.08 - # Clamp sliding range within absolute bounds - min_sliding = max(min_sliding, min_abs) - max_sliding = min(max_sliding, max_abs) - - # Avoid division by zero - if max_sliding == min_sliding: - return out_max # Or midpoint, or default value - - # Inverse linear interpolation - position = (val - min_sliding) / (max_sliding - min_sliding) - return out_max - position * (out_max - out_min) - def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 @@ -2239,23 +1965,12 @@ class Zeus_LGBMRegressor(IStrategy): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] - # if self.pairs[pair]['count_of_buys'] > 6: - # pct_to_max = 0.006 * self.pairs[pair]['count_of_buys'] - # pctClose60 = self.getPctClose60D(pair, last_candle) - - # max_60 = last_candle['max60_1d'] - # if last_candle['close'] < max_60: - # pct_to_max = 0.25 * (max_60 - last_candle['close']) / max_60 - # pct_to_max = pct_to_max * (2 - pctClose60) expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit - # print( - # f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}") return expected_profit - def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs @@ -2266,174 +1981,6 @@ class Zeus_LGBMRegressor(IStrategy): dataframe['close'].iloc[i - shift_value] return down_pct_values - # ✅ Première dérivée(variation ou pente) - # Positive: la courbe est croissante → tendance haussière. - # Négative: la courbe est décroissante → tendance baissière. - # Proche de 0: la courbe est plate → marché stable ou en transition. - # - # Applications: - # Détecter les points d’inflexion(changement de tendance) quand elle s’annule.\ - # Analyser la vitesse d’un mouvement(plus elle est forte, plus le mouvement est impulsif). - # - # ✅ Seconde dérivée(accélération ou concavité) - # Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse. - # Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse. - # Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements. - # - # Exemples: - # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. - # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. - # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. - # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. - # - # Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0. - # Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe. - def calculateRegression(self, - dataframe: DataFrame, - column='close', - window=50, - degree=3, - future_offset: int = 10 # projection à n bougies après - ) -> DataFrame: - df = dataframe.copy() - - regression_fit = [] - regression_future_fit = [] - - regression_fit = [] - regression_future_fit = [] - - for i in range(len(df)): - if i < window: - regression_fit.append(np.nan) - regression_future_fit.append(np.nan) - continue - - # Fin de la fenêtre d’apprentissage - end_index = i - start_index = i - window - y = df[column].iloc[start_index:end_index].values - - # Si les données sont insuffisantes (juste par précaution) - if len(y) < window: - regression_fit.append(np.nan) - regression_future_fit.append(np.nan) - continue - - # x centré pour meilleure stabilité numérique - x = np.linspace(-1, 1, window) - coeffs = np.polyfit(x, y, degree) - poly = np.poly1d(coeffs) - - # Calcul point présent (dernier de la fenêtre) - x_now = x[-1] - regression_fit.append(poly(x_now)) - - # Calcul point futur, en ajustant si on dépasse la fin - remaining = len(df) - i - 1 - effective_offset = min(future_offset, remaining) - x_future = x_now + (effective_offset / window) * 2 # respect du même pas - regression_future_fit.append(poly(x_future)) - - df[f"{column}_regression"] = regression_fit - # 2. Dérivée première = différence entre deux bougies successives - df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"], 4) - - # 3. Dérivée seconde = différence de la dérivée première - df[f"{column}_regression_deriv2"] = round(10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4) - - df[f"{column}_future_{future_offset}"] = regression_future_fit - - # # 2. Dérivée première = différence entre deux bougies successives - # df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4) - # - # # 3. Dérivée seconde = différence de la dérivée première - # df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4) - - return df - - def getValuesFromTable(self, values, value): - for i in range(len(values) - 1): - if values[i] <= value < values[i + 1]: - return self.labels[i] - return self.labels[-1] # cas limite pour la borne max - - # def interpolated_val_from_bins(self, row_pos, col_pos): - # """ - # Renvoie une approximation interpolée (bilinéaire) d'une valeur dans la matrice - # à partir de positions flottantes dans l'index (ligne) et les colonnes. - # - # Parameters: - # matrix_df (pd.DataFrame): Matrice des probabilités (index/colonnes = labels). - # row_pos (float): Position réelle de la ligne (0 = B5, 10 = H5). - # col_pos (float): Position réelle de la colonne (0 = B5, 10 = H5). - # - # Returns: - # float: Valeur interpolée, ou NaN si en dehors des bornes. - # """ - # - # # Labels ordonnés - # n = len(self.labels) - # - # # Vérification des limites - # if not (0 <= row_pos <= n - 1) or not (0 <= col_pos <= n - 1): - # return np.nan - # - # # Conversion des labels -> matrice - # matrix = self.smooth_smadiff_matrice_df.reindex(index=self.labels, columns=self.labels).values - # - # # Coordonnées entières (inférieures) - # i = int(np.floor(row_pos)) - # j = int(np.floor(col_pos)) - # - # # Coefficients pour interpolation - # dx = row_pos - i - # dy = col_pos - j - # - # # Précautions sur les bords - # if i >= n - 1: i = n - 2; dx = 1.0 - # if j >= n - 1: j = n - 2; dy = 1.0 - # - # # Récupération des 4 valeurs voisines - # v00 = matrix[i][j] - # v10 = matrix[i + 1][j] - # v01 = matrix[i][j + 1] - # v11 = matrix[i + 1][j + 1] - # - # # Interpolation bilinéaire - # interpolated = ( - # (1 - dx) * (1 - dy) * v00 + - # dx * (1 - dy) * v10 + - # (1 - dx) * dy * v01 + - # dx * dy * v11 - # ) - # return interpolated - - def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label): - """ - Renvoie une approximation de la valeur à partir des labels binaires (e.g. B5, H1) - en utilisant une interpolation simple basée sur les indices. - - Parameters: - matrix_df (pd.DataFrame): Matrice avec les labels binaires en index et colonnes. - row_label (str): Label de la ligne (ex: 'B3'). - col_label (str): Label de la colonne (ex: 'H2'). - - Returns: - float: Valeur approchée si possible, sinon NaN. - """ - - # Vérification des labels - if row_label not in matrice.index or col_label not in matrice.columns: - return np.nan - - # Index correspondant - row_idx = self.label_to_index.get(row_label) - col_idx = self.label_to_index.get(col_label) - - # Approximation directe (aucune interpolation complexe ici, juste une lecture) - return numeric_matrice[row_idx, col_idx] - @property def protections(self): return [ @@ -2479,32 +2026,6 @@ class Zeus_LGBMRegressor(IStrategy): # } ] - def conditional_smoothing(self, series, threshold=0.002): - smoothed = [series.iloc[0]] - for val in series.iloc[1:]: - last = smoothed[-1] - if abs(val - last) / last >= threshold: - smoothed.append(val) - else: - smoothed.append(last) - return pd.Series(smoothed, index=series.index) - - def causal_savgol(self, series, window=25, polyorder=3): - result = [] - half_window = window # Fenêtre complète dans le passé - for i in range(len(series)): - if i < half_window: - result.append(np.nan) - continue - window_series = series[i - half_window:i] - if window_series.isna().any(): - result.append(np.nan) - continue - coeffs = np.polyfit(range(window), window_series, polyorder) - poly = np.poly1d(coeffs) - result.append(poly(window - 1)) - return pd.Series(result, index=series.index) - def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15, max_stake: float = 1000.0) -> float: """ @@ -2524,141 +2045,6 @@ class Zeus_LGBMRegressor(IStrategy): stake = base_stake * (growth ** level) return min(stake, max_stake) - def compute_adaptive_paliers(self, max_drawdown: float = 0.65, first_steps: list[float] = [0.01, 0.01, 0.015, 0.02], - growth: float = 1.2) -> list[float]: - """ - Génère une liste de drawdowns négatifs avec des paliers plus rapprochés au début. - - :param max_drawdown: Drawdown max (ex: 0.65 pour -65%) - :param first_steps: Liste des premiers paliers fixes en % (ex: [0.01, 0.01, 0.015]) - :param growth: Facteur multiplicatif pour espacer les paliers suivants - :return: Liste de drawdowns négatifs (croissants) - """ - paliers = [] - cumulated = 0.0 - - # Étapes initiales rapprochées - for step in first_steps: - cumulated += step - paliers.append(round(-cumulated, 4)) - - # Étapes suivantes plus espacées - step = first_steps[-1] - while cumulated < max_drawdown: - step *= growth - cumulated += step - if cumulated >= max_drawdown: - break - paliers.append(round(-cumulated, 4)) - - return paliers - - # def get_dca_stakes(self, - # max_drawdown: float = 0.65, - # base_stake: float = 100.0, - # first_steps: list[float] = [0.01, 0.01, 0.015, 0.015], - # growth: float = 1.2, - # stake_growth: float = 1.15 - # ) -> list[tuple[float, float]]: - # """ - # Génère les paliers de drawdown et leurs stakes associés. - # - # :param max_drawdown: Maximum drawdown (ex: 0.65 pour -65%) - # :param base_stake: Mise initiale - # :param first_steps: Paliers de départ (plus resserrés) - # :param growth: Multiplicateur d'espacement des paliers - # :param stake_growth: Croissance multiplicative des mises - # :return: Liste de tuples (palier_pct, stake) - # [(-0.01, 100.0), (-0.02, 115.0), (-0.035, 132.25), (-0.05, 152.09), (-0.068, 174.9), - # (-0.0896, 201.14), (-0.1155, 231.31), (-0.1466, 266.0), (-0.1839, 305.9), (-0.2287, 351.79), - # (-0.2825, 404.56), (-0.347, 465.24), (-0.4244, 535.03), (-0.5173, 615.28), (-0.6287, 707.57)] - # """ - # paliers = [ - # (-0.01, 100.0), (-0.02, 115.0), (-0.035, 130), (-0.05, 150), (-0.07, 150), - # (-0.10, 150), (-0.15, 150), (-0.20, 150), (-0.25, 150), - # (-0.30, 200), (-0.40, 200), - # (-0.50, 300), (-0.60, 400), (-0.70, 500), (-0.80, 1000) - # ] - # - # # cumulated = 0.0 - # # stake = base_stake - # # - # # # Étapes initiales - # # for step in first_steps: - # # cumulated += step - # # paliers.append((round(-cumulated, 4), round(stake, 2))) - # # stake *= stake_growth - # # - # # # Étapes suivantes - # # step = first_steps[-1] - # # while cumulated < max_drawdown: - # # step *= growth - # # cumulated += step - # # if cumulated >= max_drawdown: - # # break - # # paliers.append((round(-cumulated, 4), round(stake, 2))) - # # stake *= stake_growth - # - # return paliers - - # def get_active_stake(self, pct: float) -> float: - # """ - # Renvoie la mise correspondant au drawdown `pct`. - # - # :param pct: drawdown courant (négatif, ex: -0.043) - # :param paliers: liste de tuples (drawdown, stake) - # :return: stake correspondant - # """ - # abs_pct = abs(pct) - # stake = self.paliers[0][1] # stake par défaut - # - # for palier, s in self.paliers: - # if abs_pct >= abs(palier): - # stake = s - # else: - # break - # - # return stake - - # def get_palier_index(self, pct): - # """ - # Retourne l'index du palier franchi pour un pourcentage de baisse donné (pct). - # On cherche le palier le plus profond atteint (dernier franchi). - # """ - # for i in reversed(range(len(self.paliers))): - # seuil, _ = self.paliers[i] - # #print(f"pct={pct} seuil={seuil}") - # if pct <= seuil: - # # print(pct) - # return i - # return None # Aucun palier atteint - - # def poly_regression_predictions(self, series: pd.Series, window: int = 20, degree: int = 2, n_future: int = 3) -> pd.DataFrame: - # """ - # Renvoie une DataFrame avec `n_future` colonnes contenant les extrapolations des n prochains points - # selon une régression polynomiale ajustée sur les `window` dernières valeurs. - # """ - # result = pd.DataFrame(index=series.index) - # x = np.arange(window) - # - # for future_step in range(1, n_future + 1): - # result[f'poly_pred_t+{future_step}'] = np.nan - # - # for i in range(window - 1, len(series)): - # y = series.iloc[i - window + 1 : i + 1].values - # - # if np.any(pd.isna(y)): - # continue - # - # coeffs = np.polyfit(x, y, degree) - # poly = np.poly1d(coeffs) - # - # for future_step in range(1, n_future + 1): - # future_x = window - 1 + future_step # Extrapolation point - # result.loc[series.index[i], f'poly_pred_t+{future_step}'] = poly(future_x) - # - # return result - def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]): """ Calcule une régression polynomiale sur les `window` dernières valeurs de la série, @@ -2700,76 +2086,24 @@ class Zeus_LGBMRegressor(IStrategy): return poly, x_future, y_future, count - # def calculateStats2(self, df, index, target): - # # Nombre de tranches (modifiable) - # n_bins_indice = 11 - # n_bins_valeur = 11 - # - # # Tranches dynamiques - # # df['indice_tranche'] = pd.qcut(df[f"{index}"], q=n_bins_indice, duplicates='drop') - # # df['valeur_tranche'] = pd.qcut(df[f"{target}"], q=n_bins_valeur, duplicates='drop') - # - # df[f"{index}_bin"], bins_1h = pd.qcut(df[f"{index}"], q=n_bins_indice, labels=self.labels, retbins=True, - # duplicates='drop') - # df[f"{target}_bin"], bins_1d = pd.qcut(df[f"{target}"], q=n_bins_valeur, labels=self.labels, retbins=True, - # duplicates='drop') - # # Affichage formaté pour code Python - # print(f"Bornes des quantiles pour {index} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]") - # print(f"Bornes des quantiles pour {target} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]") - # - # # Tableau croisé (compte) - # tableau = pd.crosstab(df[f"{index}_bin"], df[f"{target}_bin"]) - # - # # Facultatif : en pourcentages - # tableau_pct = tableau.div(tableau.sum(axis=1), axis=0) * 100 - # - # # Affichage - # print("Répartition brute :") - # print(tableau) - # print("\nRépartition en % par ligne :") - # print(tableau_pct.round(2)) - - def calculateStats(self, df, index, target): - # Nombre de tranches (modifiable) - n_bins_indice = 11 - n_bins_valeur = 11 - - # Créer les tranches dynamiques - df['indice_tranche'] = pd.qcut(df[index], q=n_bins_indice, duplicates='drop') - df['valeur_tranche'] = pd.qcut(df[target], q=n_bins_valeur, duplicates='drop') - - # Créer un tableau croisé avec la moyenne des valeurs - pivot_mean = df.pivot_table( - index='indice_tranche', - columns='valeur_tranche', - values=target, # <-- c'est la colonne qu'on agrège - aggfunc='mean' # <-- on calcule la moyenne - ) - - # Résultat - # print("Moyenne des valeurs par double-tranche :") - # print(pivot_mean.round(2)) - def should_enter_trade(self, pair: str, last_candle, current_time) -> bool: limit = 3 - # return last_candle['slope_norm_1d'] < last_candle['slope_norm_1h'] - - if self.pairs[pair]['stop'] and last_candle['max_rsi_12_1h'] <= 60 and last_candle['trend_class_1h'] == -1: - dispo = round(self.wallets.get_available_stake_amount()) - self.pairs[pair]['stop'] = False - self.log_trade( - last_candle=last_candle, - date=current_time, - action="🟢RESTART", - dispo=dispo, - pair=pair, - rate=last_candle['close'], - trade_type='', - profit=0, - buys=self.pairs[pair]['count_of_buys'], - stake=0 - ) + # if self.pairs[pair]['stop'] and last_candle['max_rsi_12_1h'] <= 60 and last_candle['trend_class_1h'] == -1: + # dispo = round(self.wallets.get_available_stake_amount()) + # self.pairs[pair]['stop'] = False + # self.log_trade( + # last_candle=last_candle, + # date=current_time, + # action="🟢RESTART", + # dispo=dispo, + # pair=pair, + # rate=last_candle['close'], + # trade_type='', + # profit=0, + # buys=self.pairs[pair]['count_of_buys'], + # stake=0 + # ) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. @@ -2841,33 +2175,9 @@ class Zeus_LGBMRegressor(IStrategy): pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle) - val = self.getProbaHausseSma5d(last_candle) - if val < 15: - return False - - # if count_decrease == len(non_btc_pairs): - # self.should_enter_trade_count += 1 - # char="." - # print(f"should_enter_trade canceled all pairs decreased {'':{char}>{self.should_enter_trade_count}}") - # return False - # if (last_candle['mid_smooth_1h_deriv1'] < -0.0 and last_candle['sma24_deriv1_1h'] < -0.0): - # return False - - # if (last_candle['sma5_deriv1_1d'] < -0.1 and last_candle['sma5_deriv2_1d'] < -0) \ - # or last_candle['sma5_deriv2_1d'] < -0.2: - # return False - if last_candle['mid_smooth_1h_deriv1'] < -0.02: # and last_candle['mid_smooth_1h_deriv2'] > 0): return False - # if self.pairs[pair]['count_of_buys'] >= 3: - # if (last_candle['sma24_deriv1_1d'] < self.sma24_deriv1_1d_protection.value - # and last_candle['sma5_deriv1_1d'] < self.sma5_deriv1_1d_protection.value \ - # and last_candle['sma5_deriv2_1d'] < -0.05): - # # or (last_candle['sma5_deriv1_1d'] < -0.1 and last_candle['sma24_deriv1_1h'] < -0.1): - # self.pairs[pair]['stop'] = True - # return False - self.should_enter_trade_count = 0 # if max_pair != pair and self.pairs[pair]['total_amount'] > 300: @@ -2886,218 +2196,6 @@ class Zeus_LGBMRegressor(IStrategy): else: return True - @staticmethod - def check_derivatives_vectorized(dataframe, deriv_pairs, thresholds): - """ - Retourne True si toutes les dérivées respectent leur seuil. - """ - mask = pd.Series(True, index=dataframe.index) - for d1_col, d2_col in deriv_pairs: - d1_thresh = thresholds.get(d1_col, 0) - d2_thresh = thresholds.get(d2_col, 0) - mask &= (dataframe[d1_col] >= d1_thresh) & (dataframe[d2_col] >= d2_thresh) - return mask - - # ---------------------------------------------------------------------------------------------- - # fallback defaults (used when no JSON exists) - PARAMS_DIR = 'params' - - DEFAULT_PARAMS = { - "rsi_buy": 30, - "rsi_sell": 70, - "ema_period": 21, - "sma_short": 20, - "sma_long": 100, - "atr_period": 14, - "atr_multiplier": 1.5, - "stake_amount": None, # use exchange default - "stoploss": -0.10, - "minimal_roi": {"0": 0.10} - } - - def __init__(self, config: dict) -> None: - super().__init__(config) - # self.parameters = self.load_params_tree("user_data/strategies/params/") - - def setTrends(self, dataframe: DataFrame): - SMOOTH_WIN=10 - df = dataframe.copy() - - # # --- charger les données --- - # df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') - - # --- calcul SMA14 --- - # df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14) - - # --- pente brute --- - df['slope'] = df['sma12'].diff() - - # --- lissage EMA --- - df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean() - - # df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3) - - # --- normalisation relative --- - df['slope_norm'] = 10000 * df['slope_smooth'] / df['close'] - # df['slope_norm'].fillna(0, inplace=True) - df['slope_norm'] = df['slope_norm'].fillna(0) - - # --- classification dynamique via quantiles --- - - q = df['slope_norm'].quantile([0.125, 0.375, 0.625, 0.875]).values - q1, q2, q3, q4 = q - - def classify_expanding(series): - trend_class = [] - for i in range(len(series)): - past_values = series[:i + 1] # uniquement le passé - q = past_values.quantile([0.125, 0.375, 0.625, 0.875]).values - q1, q2, q3, q4 = q - v = series.iloc[i] - if v <= q1: - trend_class.append(-2) - elif v <= q2: - trend_class.append(-1) - elif v <= q3: - trend_class.append(0) - elif v <= q4: - trend_class.append(1) - else: - trend_class.append(2) - return trend_class - - dataframe['slope_norm'] = df['slope_norm'] - # dataframe['trend_class'] = df['slope_norm'].apply(classify) - dataframe['trend_class'] = None - - # Rolling sur la fenêtre passée - dataframe['trend_class'] = classify_expanding(dataframe['slope_norm']) - - - # # -------------------------- Trend detection (M2) -------------------------- - # def getTrend(self, dataframe: DataFrame) -> str: - # """ - # M2: SMA50 / SMA200 golden/death cross - # - bull: sma50 > sma200 - # - bear: sma50 < sma200 - # - range: sma50 ~= sma200 (within a small pct) - # - # Uses only past data (no future lookahead). - # """ - # if dataframe is None or len(dataframe) < max(self.DEFAULT_PARAMS['sma_short'], self.DEFAULT_PARAMS['sma_long']) + 2: - # return 'RANGE' - # - # sma_short = dataframe['close'].rolling(window=self.DEFAULT_PARAMS['sma_short']).mean() - # sma_long = dataframe['close'].rolling(window=self.DEFAULT_PARAMS['sma_long']).mean() - # - # cur_short = sma_short.iloc[-1] - # cur_long = sma_long.iloc[-1] - # - # # small relative threshold to avoid constant flips - # if cur_long == 0 or cur_short == 0: - # return 'RANGE' - # - # rel = abs(cur_short - cur_long) / cur_long - # threshold = 0.01 # 1% by default; tweak as needed - # - # if rel <= threshold: - # return 'RANGE' - # if cur_short > cur_long: - # return 'BULL' - # return 'BEAR' - - # # -------------------------- Parameter loading -------------------------- - # def loadParamsFor(self, pair: str, trend: str) -> dict: - # """Load JSON from params//.json with fallback to DEFAULT_PARAMS.""" - # pair_safe = pair.replace('/', '-') # folder name convention: BTC-USDT - # # cache key - # cache_key = f"{pair_safe}:{trend}" - # if cache_key in self._params_cache: - # return self._params_cache[cache_key] - # - # path = os.path.join(self.PARAMS_DIR, pair_safe, f"{trend}.json") - # if os.path.isfile(path): - # try: - # with open(path, 'r') as f: - # params = json.load(f) - # # merge with defaults so missing keys won't break - # merged = {**self.DEFAULT_PARAMS, **params} - # self._params_cache[cache_key] = merged - # logger.info(f"Loaded params for {pair} {trend} from {path}") - # return merged - # except Exception as e: - # logger.exception(f"Failed to load params {path}: {e}") - # - # # fallback - # logger.info(f"Using DEFAULT_PARAMS for {pair} {trend}") - # self._params_cache[cache_key] = dict(self.DEFAULT_PARAMS) - # return self._params_cache[cache_key] - - - def load_params_tree(self, base_path="user_data/strategies/params/"): - base = Path(base_path) - params_tree = {} - if not base.exists(): - raise FileNotFoundError(f"Base path '{base_path}' not found.") - - for pair_dir in base.iterdir(): - if not pair_dir.is_dir(): - continue - pair = self.getShortName(pair_dir.name) # ex : BTC-USDT - params_tree.setdefault(pair, {}) - - for trend_dir in pair_dir.iterdir(): - if not trend_dir.is_dir(): - continue - trend = trend_dir.name # ex : bull / bear / range - params_tree[pair].setdefault(trend, []) - - for file in trend_dir.glob("*-hyperopt_result.json"): - filename = file.name - - # Extraire START et END - try: - prefix = filename.replace("-hyperopt_result.json", "") - start, end = prefix.split("-", 1) # split en 2 - except Exception: - start = None - end = None - - # Lire le JSON - try: - with open(file, "r") as f: - content = json.load(f) - except Exception as err: - content = {"error": str(err)} - - params_tree[pair][trend].append({ - "start": start, - "end": end, - "file": str(file), - "content": content, - }) - for pair, trends in params_tree.items(): - for trend, entries in trends.items(): - if entries: - # indic_5m = self.getParamValue(pair, trend, 'buy', 'indic_5m') - # indic_deriv1_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv1_5m') - # indic_deriv2_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv2_5m') - # - # indic_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_5m_sell') - # indic_deriv1_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv1_5m_sell') - # indic_deriv2_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv2_5m_sell') - - print(f"{pair} -> {trend}") # {indic_5m} {indic_deriv1_5m} {indic_deriv2_5m} {indic_5m_sell} {indic_deriv1_5m_sell} {indic_deriv2_5m_sell}") - # for entry in entries: - # print(entry) - - return params_tree - - def getParamValue(self, pair, trend, space, param): - pair = self.getShortName(pair) - return self.parameters[pair][trend][0]['content']['params'][space][param] - - def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7): """ Sélectionne les features les plus corrélées avec target, @@ -3146,16 +2244,12 @@ class Zeus_LGBMRegressor(IStrategy): plt.legend() plt.grid(True) - out_path = f"{path}/lgbm_function.png" + out_path = f"{self.path}/lgbm_function.png" plt.savefig(out_path, bbox_inches="tight") plt.close() print(f"Graphique sauvegardé : {out_path}") - import numpy as np - import seaborn as sns - import matplotlib.pyplot as plt - def graphFonctionAppriseFeature(self, path, X_test, y_test, y_pred): plt.figure(figsize=(14, 8)) colors = sns.color_palette("coolwarm", n_colors=X_test.shape[1]) @@ -3180,7 +2274,7 @@ class Zeus_LGBMRegressor(IStrategy): plt.legend(loc="right") plt.grid(True) - out_path = f"{path}/lgbm_features.png" + out_path = f"{self.path}/lgbm_features.png" plt.savefig(out_path, bbox_inches="tight") plt.close() @@ -3226,7 +2320,7 @@ class Zeus_LGBMRegressor(IStrategy): print(f"Meilleur RMSE : {study.best_value:.4f}") # 🔹 Sauvegarder les résultats - optuna_path = f"{path}/optuna_lgbm_results.txt" + optuna_path = f"{self.path}/optuna_lgbm_results.txt" with open(optuna_path, "w") as f: f.write(f"Best params:\n{study.best_params}\n") f.write(f"Best RMSE: {study.best_value:.4f}\n") @@ -3271,3 +2365,449 @@ class Zeus_LGBMRegressor(IStrategy): print(f"✅ {X_clean.shape[1]} colonnes conservées après filtrage.\n") return X_clean + def setTrends(self, dataframe: DataFrame): + SMOOTH_WIN=10 + df = dataframe.copy() + + # # --- charger les données --- + # df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') + + # --- calcul SMA14 --- + # df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14) + + # --- pente brute --- + df['slope'] = df['sma12'].diff() + + # --- lissage EMA --- + df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean() + + # df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3) + + # --- normalisation relative --- + df['slope_norm'] = 10000 * df['slope_smooth'] / df['close'] + # df['slope_norm'].fillna(0, inplace=True) + df['slope_norm'] = df['slope_norm'].fillna(0) + dataframe['slope_norm'] = df['slope_norm'] + + + try: + from lightgbm import LGBMRegressor + _HAS_LGBM = True + except Exception: + _HAS_LGBM = False + + def make_model(self, model_type="linear", degree=2, random_state=0): + model_type = model_type.lower() + if model_type == "linear": + return LinearRegression() + if model_type == "poly": + return make_pipeline(StandardScaler(), PolynomialFeatures(degree=degree, include_bias=False), + LinearRegression()) + if model_type == "svr": + return make_pipeline(StandardScaler(), SVR(kernel="rbf", C=1.0, epsilon=0.1)) + if model_type == "rf": + return RandomForestRegressor(n_estimators=100, random_state=random_state, n_jobs=1) + if model_type == "lgbm": + if not _HAS_LGBM: + raise RuntimeError("lightgbm n'est pas installé") + return LGBMRegressor(n_estimators=100, random_state=random_state) + raise ValueError(f"model_type inconnu: {model_type}") + + def calculateRegressionNew(self, df, indic, lookback=20, future_steps=5, model_type="linear"): + df = df.copy() + pred_col = f"{indic}_future_pred_cons" + df[pred_col] = np.nan + + X_idx = np.arange(lookback).reshape(-1, 1) + + values = df[indic].values + n = len(values) + + model = LinearRegression() + + for i in range(lookback, n - future_steps): + window = values[i - lookback:i] + + # cible = vraie valeur future + y_target = values[i + future_steps] + + if np.isnan(window).any() or np.isnan(y_target): + continue + + # entraînement + model.fit(X_idx, window) + + # prédiction de la valeur future + future_x = np.array([[lookback + future_steps - 1]]) + pred_future = model.predict(future_x)[0] + + # la prédiction concerne i + future_steps + df.iloc[i + future_steps, df.columns.get_loc(pred_col)] = pred_future + + return df + + def calculateRegression(self, + df, + indic, + lookback=30, + future_steps=5, + model_type="linear", # "linear", "ridge", "huber", "rf", ... + degree=2, + weight_mode="exp", + weight_strength=0.5, + clip_k=2.0, # multiplier pour rolling_std pour clipper le delta + blend_alpha=0.7, # alpha pour mélanger prédiction et persistance (0..1) + ): + """ + Pour chaque index i (>= lookback), on entraîne sur la fenêtre [i-lookback, i) + et on prédit la variation (delta) entre valeur à i et i+future_steps. + On clippe la delta à ±clip_k * rolling_std, et on blend avec la valeur actuelle. + Résultat écrit dans df[f"{indic}_future_pred_cons"] à la position i. + """ + df = df.copy() + colname = f"{indic}_future_pred_cons" + df[colname] = np.nan + + values = df[indic].values + n = len(values) + + # préparation X fixe 0..lookback-1 + X_window = np.arange(lookback).reshape(-1, 1) + + # pondérations + if weight_mode == "exp": + weights = np.exp(np.linspace(-weight_strength, weight_strength, lookback)) + elif weight_mode == "linear": + weights = np.linspace(0.5, 1.0, lookback) + else: + weights = np.ones(lookback) + + # # choix modèle simple + # def make_estimator(mt): + # mt = mt.lower() + # if mt == "linear": + # return LinearRegression() + # if mt == "ridge": + # return Ridge(alpha=1.0) + # if mt == "huber": + # return HuberRegressor() + # # fallback + # return LinearRegression() + + for i in range(lookback, n - future_steps): + window = values[i - lookback:i] + if np.isnan(window).any(): + continue + + # y_target = delta = value at (i + future_steps) - value at i + y_target = values[i + future_steps] - values[i] # étiquette: delta future (scalare) + + # pour entraîner, on construit y_window des deltas relatifs à la fin de la fenêtre + # une façon simple : prendre y = window (valeurs) and prédire valeur future puis soustraire current + # mais plus stable : prédire directement le delta en construisant y_window comme différences + # entre chaque point j dans la fenêtre et la valeur à la fin de la fenêtre (index lookback-1) + # Ici on utilise une méthode simple : on prédit la valeur et on fera delta = pred - last + y_train = window # on prédit valeur future relative à la position de la fenêtre (standard approach) + + model = self.make_model(model_type=model_type, degree=degree) + try: + # essayer avec sample_weight + model.fit(X_window, y_train, sample_weight=weights) + y_pred_value = model.predict(np.array([[lookback + future_steps - 1]]))[0] + # delta prédite = pred_value - valeur actuelle (i) + pred_delta = y_pred_value - values[i] + except TypeError: + # modèle sans sample_weight + model.fit(X_window, y_train) + y_pred_value = model.predict(np.array([[lookback + future_steps - 1]]))[0] + pred_delta = y_pred_value - values[i] + except Exception as exception: + # modèle sans sample_weight + model.fit(X_window, y_train) + y_pred_value = model.predict(np.array([[lookback + future_steps - 1]]))[0] + pred_delta = y_pred_value - values[i] + + + # clipping basé sur la volatilité locale (rolling std calculée sur la fenêtre) + local_std = np.std(window) + max_change = clip_k * (local_std if local_std > 0 else 1e-9) + pred_delta = np.clip(pred_delta, -max_change, max_change) + + # blending entre persistance (0) et prédiction + blended = blend_alpha * (values[i] + pred_delta) + (1 - blend_alpha) * values[i] + # si tu préfères stocker la valeur absolue prédite : + final_pred_value = blended + + df.iloc[i, df.columns.get_loc(colname)] = final_pred_value + + return df + + # def calculateRegression(self, + # df, + # indic, + # lookback=30, + # future_steps=5, + # model_type="linear", + # degree=2, + # random_state=0, + # weight_mode="exp", # "exp", "linear" ou None + # weight_strength=0.2, # plus c’est grand, plus les dernières bougies comptent + # ): + # """ + # Ajoute une régression glissante qui prévoit la valeur future à horizon 'future_steps', + # avec pondération des dernières valeurs si weight_mode != None. + # """ + # df = df.copy() + # colname = f"{indic}_future_pred_{model_type}" + # df[colname] = np.nan + # + # values = df[indic].values + # n = len(values) + # X_window = np.arange(lookback).reshape(-1, 1) + # + # # génération du schéma de pondération + # if weight_mode == "exp": + # # exponentiel → les derniers points pèsent beaucoup plus + # weights = np.exp(np.linspace(-weight_strength, weight_strength, lookback)) + # elif weight_mode == "linear": + # # poids linéaire → 1..lookback + # weights = np.linspace(0.5, 1.0, lookback) + # else: + # weights = np.ones(lookback) + # + # for i in range(lookback, n - future_steps): + # y_window = values[i - lookback:i] + # if np.isnan(y_window).any(): + # continue + # + # model = self.make_model(model_type=model_type, degree=degree, random_state=random_state) + # + # try: + # model.fit(X_window, y_window, sample_weight=weights) + # except TypeError: + # # certains modèles (RF) ne supportent pas sample_weight dans ce contexte + # model.fit(X_window, y_window) + # except Exception: + # continue + # + # X_pred = np.array([[lookback + future_steps - 1]]) + # try: + # pred = model.predict(X_pred)[0] + # except Exception: + # continue + # + # df.iloc[i, df.columns.get_loc(colname)] = pred + # + # return df + + # def calculateRegression(self, df, indic, lookback=30, future_steps=5): + # """ + # Ajoute un indicateur {indic}_future_pred qui contient, + # pour chaque bougie n, la valeur attendue à n + future_steps + # selon une régression linéaire sur les lookback dernières bougies. + # """ + # df = df.copy() + # df[f"{indic}_future_pred"] = np.nan + # + # values = df[indic].values + # n = len(values) + # + # model = LinearRegression() + # + # for i in range(lookback, n - future_steps): + # # Fenêtre d’apprentissage + # X = np.arange(lookback).reshape(-1, 1) + # y = values[i - lookback:i] + # + # model.fit(X, y) + # + # # Prédiction future + # next_X = np.array([[lookback + future_steps - 1]]) + # future_pred = model.predict(next_X)[0] + # + # # On insère la prédiction à la position actuelle (n) + # df.iloc[i, df.columns.get_loc(f"{indic}_future_pred")] = future_pred + # + # return df + + def add_future_quantiles(self, dataframe, indic, lookback=30, future_steps=5, quantiles=[0.1, 0.5, 0.9]): + + working_columns = self.listUsableColumns(dataframe) + + df = dataframe[self.model_indicators].copy() + n = len(df) + target = self.indicator_target + "_future" + + df[target] = dataframe[self.indicator_target].shift(-24) # > df['sma24'] * 1.003).astype(int) + df[target] = df[target].fillna(0) #.astype(int) + + # Créer les colonnes pour chaque quantile + for q in quantiles: + df[f"{indic}_future_q{int(q * 100)}"] = np.nan + + # Préparer toutes les fenêtres X + X = np.array([df[indic].iloc[i - lookback:i].values for i in range(lookback, n - future_steps)]) + y_idx = np.arange(lookback, n - future_steps) + future_steps # index des valeurs futures + + # Imputer les NaN + imputer = SimpleImputer(strategy='median') + X_imputed = imputer.fit_transform(X) + + # Pour chaque quantile, créer un modèle et prédire + for q in quantiles: + model = HistGradientBoostingRegressor(loss='quantile', quantile=q, max_iter=100) + # Entrainer chaque ligne X_imputed à prédire la dernière valeur de la fenêtre + future_steps + # Ici, comme on prédit delta future par fenêtre, on peut utiliser la valeur cible correspondante + y = df[indic].iloc[y_idx].values + model.fit(X_imputed, y) + y_pred = model.predict(X_imputed) + + # Écrire les prédictions dans le dataframe + df.iloc[lookback:n - future_steps, df.columns.get_loc(f"{indic}_future_q{int(q * 100)}")] = y_pred + + df_plot = df.iloc[lookback:-future_steps] + self.plot_future_quantiles_band(df_plot, indic=self.indicator_target, quantiles=[0.1, 0.5, 0.9]) + # self.compute_quantile_confidence(df_plot, indic=self.indicator_target, quantiles=[0.1, 0.5, 0.9]) + + # fig, ax = plt.subplots(figsize=(20, 20)) + # for q in quantiles: + # plt.plot(stats.index.astype(str), stats[q], marker='o', label=f"Q{int(q * 100)}") + # plt.xticks(rotation=45) + # plt.xlabel(f"{indic} bins") + # plt.ylabel(f"Quantiles") + # plt.title(f"Distribution quantile de {indic}") + # plt.legend() + # plt.grid(True) + # plt.tight_layout() + # # plt.show() + # # --- Sauvegarde --- + # output_path = f"{path}/Distribution_quantile.png" + # plt.savefig(output_path, bbox_inches="tight", dpi=150) + # plt.close(fig) + # + # target = "future_return" + quantiles = [0.1, 0.25, 0.5, 0.75, 0.9] + for indicator in working_columns: + df["bin"] = pd.qcut(df[indicator], q=20, duplicates="drop") + stats = df.groupby("bin")[target].quantile(quantiles).unstack() + + fig, ax = plt.subplots(figsize=(10, 10)) + # plt.figure(figsize=(12, 6)) + for q in stats.columns: + plt.plot(stats.index.astype(str), stats[q], marker='o', label=f"Q{int(q * 100)}") + + plt.xticks(rotation=45) + plt.xlabel(f"{indicator} bins") + plt.ylabel(f"Quantiles of {target}") + plt.title(f"Distribution quantile de {target} selon {indicator}") + plt.legend() + plt.grid(True) + plt.tight_layout() + # --- Sauvegarde --- + output_path = f"{self.path}/Distribution_{indicator}.png" + plt.savefig(output_path, bbox_inches="tight", dpi=150) + plt.close(fig) + # plt.show() + + return df + + def plot_future_quantiles_band(self, df, indic, quantiles=[0.1, 0.5, 0.9], lookback=30, future_steps=5): + """ + df: DataFrame contenant la colonne réelle et les colonnes de quantiles + indic: nom de la colonne cible (ex: 'mid') + quantiles: liste des quantiles prédits + """ + # plt.figure(figsize=(16, 6)) + fig, ax = plt.subplots(figsize=(96, 30)) + + # Série réelle + plt.plot(df[indic], label=f"{indic} réel", color='black', linewidth=1.2) + + # Récupérer les colonnes de quantiles + cols_q = [f"{indic}_future_q{int(q * 100)}" for q in quantiles] + + # Vérifier que tous les quantiles existent + cols_q = [c for c in cols_q if c in df.columns] + + if len(cols_q) < 2: + print("Au moins deux quantiles sont nécessaires pour afficher les bandes") + return + + # Ordre : q_min, q_median, q_max + df_plot = df[cols_q] + + # Couleur pour la bande + color = sns.color_palette("coolwarm", n_colors=1)[0] + + # Tracer la bande entre min et max quantiles + plt.fill_between(df.index, + df_plot.iloc[:, 0], # quantile bas (ex: 10%) + df_plot.iloc[:, -1], # quantile haut (ex: 90%) + color=color, + alpha=0.3, + label=f"Intervalle {quantiles[0] * 100}-{quantiles[-1] * 100}%") + + # Tracer la médiane + if len(cols_q) >= 3: + plt.plot(df_plot.iloc[:, 1], color=color, linestyle='--', linewidth=1, label="Quantile médian") + + plt.title(f"Prédiction futures valeurs de {indic} avec intervalle de quantiles") + plt.xlabel("Index / Bougies") + plt.ylabel(indic) + plt.legend() + plt.grid(True) + # plt.show() + # --- Sauvegarde --- + output_path = f"{self.path}/Prédiction futures valeurs de {indic}.png" + plt.savefig(output_path, bbox_inches="tight", dpi=150) + plt.close(fig) + + def compute_quantile_confidence(self, df, indic, quantiles=[0.1, 0.5, 0.9]): + """ + df: DataFrame contenant les colonnes des quantiles + indic: nom de la colonne réelle + quantiles: liste des quantiles prédits + Retourne une série score [-1,1], positif = au-dessus de la médiane, négatif = en dessous + """ + + # df['quantile_conf'] = compute_quantile_confidence(df_plot, indic='mid') + # + # # Exemple de signal simple + # df['buy_signal'] = df['quantile_conf'] < -0.5 # valeur sous la médiane + bande étroite + # df['sell_signal'] = df['quantile_conf'] > 0.5 # valeur au-dessus de la médiane + bande étroite + + col_low = f"{indic}_future_q{int(quantiles[0] * 100)}" + col_med = f"{indic}_future_q{int(quantiles[1] * 100)}" + col_high = f"{indic}_future_q{int(quantiles[2] * 100)}" + + # largeur de bande (incertitude) + band_width = df[col_high] - df[col_low] + 1e-9 # éviter division par 0 + + # distance normalisée à la médiane + score = (df[indic] - df[col_med]) / band_width + + # clipper le score dans [-1,1] pour éviter les valeurs extrêmes + score = np.clip(score, -1, 1) + + # plt.figure(figsize=(16, 6)) + fig, ax = plt.subplots(figsize=(16, 6)) + plt.plot(df[indic], color='black', label='Valeur réelle') + plt.fill_between(df.index, + df[f"{indic}_future_q10"], + df[f"{indic}_future_q90"], + alpha=0.3, color='blue', label='Intervalle 10%-90%') + plt.plot(df[f"{indic}_future_q50"], linestyle='--', color='blue', label='Médiane') + + # Ajouter le score comme couleur de fond + plt.scatter(df.index, df[indic], c=df['quantile_conf'], cmap='coolwarm', s=20) + plt.colorbar(label='Score de confiance') + plt.title("Prédiction + score de confiance quantile") + plt.legend() + plt.grid(True) + # plt.show() + # --- Sauvegarde --- + output_path = f"{self.path}/Prédiction score confiance de {indic}.png" + plt.savefig(output_path, bbox_inches="tight", dpi=150) + plt.close(fig) + + return score