# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np import os import json from pandas import DataFrame from typing import Optional, Union, Tuple import math import logging from pathlib import Path # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from datetime import timezone, timedelta logger = logging.getLogger(__name__) # Machine Learning from sklearn.model_selection import train_test_split import joblib import matplotlib.pyplot as plt from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve, precision_score, recall_score, precision_recall_curve, f1_score, mean_squared_error, r2_score ) from sklearn.tree import export_text import inspect from sklearn.feature_selection import SelectFromModel from tabulate import tabulate from sklearn.feature_selection import VarianceThreshold import seaborn as sns import lightgbm as lgb from sklearn.model_selection import cross_val_score import optuna.visualization as vis import optuna from lightgbm import LGBMRegressor from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" import warnings warnings.filterwarnings("ignore", message="No further splits with positive gain") def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df class Zeus_LGBMRegressor(IStrategy): startup_candle_count = 24 # Machine Learning model_indicators = [ "ms-10", "ms-5", "ms-4", "ms-3", "ms-2", "ms-1", "ms-0", 'rsi', 'rsi_deriv1', 'rsi_deriv2', "max_rsi_12", "bb_percent", 'vol_24', 'percent3', 'sma5_dist', 'sma5_deriv1', 'sma5_deriv2', 'sma24_dist', 'sma24_deriv1', 'sma24_deriv2', 'sma60_dist', 'sma60_deriv1', 'sma60_deriv2', 'down_pct', 'slope_norm', 'min_max_60', 'rsi_slope', 'adx_change', 'volatility_ratio', 'slope_ratio', 'bb_width', 'rsi_1h', 'rsi_deriv1_1h', 'rsi_deriv2_1h', "max_rsi_12_1h", ] model = None # model_indicators = ["ms-10", "ms-5", "ms-4", "ms-3", "ms-2", "ms-1", "ms-0"] # model_indicators = ['open', 'high', 'close', 'haclose', 'percent', 'sma5', 'sma12', 'sma24', 'sma24_deriv1', 'sma24_deriv2', 'sma48', 'sma48_deriv1', 'sma48_deriv2', 'sma60', 'sma60_dist', 'sma60_deriv1', # 'sma60_deriv2', 'mid_smooth_3_deriv2', 'mid_smooth_12_deriv1', 'mid_smooth_12_deriv2', 'mid_smooth_24', 'mid_smooth_24_deriv1', 'mid_smooth_24_deriv2', 'max_rsi_12', 'max_rsi_24', 'max12', # 'max60', 'min60', 'min_max_60', 'bb_lowerband', 'bb_upperband', 'bb_width', 'macd', 'macdsignal', 'macdhist', 'sma_20', 'sma_100', 'atr', 'atr_norm', 'adx', 'obv', 'vol_24', 'adx_change', # 'volatility_ratio', 'slope_ratio', 'mid_smooth_1h_deriv2', 'mid_smooth_5h', 'mid_smooth_5h_deriv1', 'mid_smooth_5h_deriv2'] levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] # startup_candle_count = 12 * 24 * 5 # ROI table: minimal_roi = { "0": 0.564, "567": 0.273, "2814": 0.12, "7675": 0 } stakes = 40 # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = False trailing_stop = True trailing_stop_positive = 0.15 trailing_stop_positive_offset = 0.20 trailing_only_offset_is_reached = True # Buy hypers timeframe = '5m' max_open_trades = 5 max_amount = 40 parameters = {} # DCA config position_adjustment_enable = True plot_config = { "main_plot": { "sma24_1h": { "color": "pink" }, "sma5_1d": { "color": "blue" }, # "sma24": { # "color": "yellow" # }, "sma60": { "color": "green" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", }, # "sma12": { # "color": "blue" # }, "mid_smooth_3_1h": { "color": "blue" } }, "subplots": { "Rsi": { "max_rsi_24": { "color": "blue" }, "max_rsi_24_1h": { "color": "pink" }, # "rsi_1h": { # "color": "red" # }, # "rsi_1d": { # "color": "blue" # } }, "Rsi_deriv1": { "sma24_deriv1_1h": { "color": "pink" }, "sma24_deriv1": { "color": "yellow" }, "sma5_deriv1_1d": { "color": "blue" }, "sma60_deriv1": { "color": "green" } }, "Rsi_deriv2": { "sma24_deriv2_1h": { "color": "pink" }, "sma24_deriv2": { "color": "yellow" }, "sma5_deriv2_1d": { "color": "blue" }, "sma60_deriv2": { "color": "green" } }, "States": { "tdc_macd_1h": { "color": "cyan" }, "sma24_state_1h": { "color": "pink" }, "sma24_state": { "color": "yellow" }, "sma5_state_1d": { "color": "blue" }, "sma60_state": { "color": "green" } }, 'Macd': { "macd_rel_1d": { "color": "cyan" }, "macdsignal_rel_1d": { "color": "pink" }, "macdhist_rel_1d": { "color": "yellow" } } } } columns_logged = False pairs = { pair: { "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 # factors = [1, 1.1, 1.25, 1.5, 2.0, 3] # thresholds = [2, 5, 10, 20, 30, 50] factors = [0.5, 0.75, 1, 1.25, 1.5, 2] thresholds = [0, 2, 5, 10, 30, 45] trades = list() max_profit_pairs = {} mise_factor_buy = DecimalParameter(0.01, 0.1, default=0.05, decimals=2, space='buy', optimize=True, load=True) indicators = {'sma5', 'sma12', 'sma24', 'sma60'} indicators_percent = {'percent', 'percent3', 'percent12', 'percent24', 'percent_1h', 'percent3_1h', 'percent12_1h', 'percent24_1h'} mises = IntParameter(1, 50, default=5, space='buy', optimize=False, load=False) ml_prob_buy = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='buy', optimize=True, load=True) ml_prob_sell = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='sell', optimize=True, load=True) pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=True, load=True) pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True) # deriv1_buy_protect = DecimalParameter(-0.3, 0.1, default=-0.1, decimals=2, space='protection', optimize=True, load=True) # rsi_buy_protect = IntParameter(50, 90, default=70, space='protection', optimize=True, load=True) # indic_5m_slope_sup = CategoricalParameter(indicators, default="sma60", space='protection') # indic_1h_slope_sup = CategoricalParameter(indicators, default="sma5", space='protection') labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] index_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] ordered_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] label_to_index = {label: i for i, label in enumerate(ordered_labels)} # ========================================================================= # paliers dérivées jour sma5 sma5_deriv1 = [-1.1726, -0.2131, -0.1012, -0.0330, 0.0169, 0.0815, 0.2000, 4.0335] sma5_deriv2 = [-1.9190, -0.1388, -0.0644, -0.0202, 0.0209, 0.0646, 0.1377, 4.2987] sma5_derive1_2_matrice = { 'B3': [8.6, 10.8, 34.6, 35.0, 58.8, 61.9, 91.2], 'B2': [0.0, 12.5, 9.1, 57.1, 63.3, 79.3, 89.5], 'B1': [6.1, 12.5, 22.0, 46.8, 61.5, 70.0, 100.0], 'N0': [0.0, 10.7, 37.0, 43.5, 75.0, 75.9, 100.0], 'H1': [0.0, 18.5, 32.4, 35.9, 76.8, 82.9, 92.0], 'H2': [0.0, 21.9, 16.0, 39.5, 69.7, 83.3, 100.0], 'H3': [9.5, 29.2, 41.2, 57.9, 53.8, 86.8, 92.3], } sma5_derive1_2_matrice_df = pd.DataFrame(sma5_derive1_2_matrice, index=index_labels) # Extraction de la matrice numérique sma5_derive1_2_numeric_matrice = sma5_derive1_2_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values should_enter_trade_count = 0 def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() # val = self.getProbaHausse144(last_candle) # trend = last_candle['trend_class'] # params = self.loadParamsFor(pair, trend) # indic_5m = self.getParamValue(pair, trend, 'buy', 'indic_5m') # indic_deriv1_5m = self.getParamValue( pair, trend, 'buy', 'indic_deriv1_5m') # indic_deriv2_5m = self.getParamValue( pair, trend, 'buy', 'indic_deriv2_5m') condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) # allow_to_buy = True #(not self.stop_all) #& (not self.all_down) # and val > self.buy_val.value #not last_candle['tendency'] in (-1, -2) # (rate <= float(limit)) | (entry_tag == 'force_entry') allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') # if allow_to_buy: # poly_func, x_future, y_future, count = self.polynomial_forecast( # dataframe['mid_smooth_12'], # window=self.buy_horizon_predict_1h.value * 12, # degree=4, # n_future=3) # # if count < 3: # allow_to_buy = False force = self.pairs[pair]['force_buy'] if self.pairs[pair]['force_buy']: self.pairs[pair]['force_buy'] = False allow_to_buy = True else: if not self.should_enter_trade(pair, last_candle, current_time): allow_to_buy = False if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() profit =trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['previous_profit'] = 0 self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None # else: # print(f"STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_2 = dataframe.iloc[-3].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) max_profit = self.pairs[pair]['max_profit'] baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 # trend = last_candle['trend_class'] # # indic_5m_sell = self.getParamValue( pair, trend, 'sell', 'indic_5m_sell') # indic_deriv1_5m_sell = self.getParamValue( pair, trend, 'sell', 'indic_deriv1_5m_sell') # indic_deriv2_5m_sell = self.getParamValue( pair, trend, 'sell', 'indic_deriv2_5m_sell') if hours % 4 == 0: self.log_trade( last_candle=last_candle, date=current_time, action="🔴 CURRENT" if self.pairs[pair]['stop'] else "🟢 CURRENT", dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=round(profit, 2), buys=count_of_buys, stake=0 ) # if (last_candle['mid_smooth_deriv1'] >= 0): # return None # if (last_candle['tendency'] in (2, 1)) and (last_candle['rsi'] < 80): # return None # # if (last_candle['sma24_deriv1'] < 0 and before_last_candle['sma24_deriv1'] >= 0) and (current_profit > expected_profit): # return 'Drv_' + str(count_of_buys) pair_name = self.getShortName(pair) # if (current_profit > expected_profit) and last_candle['can_sell']: # return 'Can_' + pair_name + '_' + str(count_of_buys) # trend = last_candle['trend_class_1d'] # if (trend == "B-" or trend == "B--") and self.pairs[pair]['has_gain'] == 0: # and (last_candle[f"{indic_5m_sell}_deriv1"] <= indic_deriv1_5m_sell and last_candle[f"{indic_5m_sell}_deriv2"] <= indic_deriv2_5m_sell): # # if (last_candle['max_rsi_12_1h'] > 75) and last_candle['trend_class_1h'] == 1 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0): # self.pairs[pair]['stop'] = True # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🔴STOP", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=self.pairs[pair]['current_profit'], # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # return "MAX_RSI" # # return None # if (trend == "B-" or trend == "B--") and last_candle[f"{self.indic_5m_sell.value}_deriv1"] <= self.indic_deriv1_5m_sell.value \ # and last_candle[f"{self.indic_5m_sell.value}_deriv2"] <= self.indic_deriv2_5m_sell.value: # return None if last_candle['max_rsi_24'] > 85 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0.05: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Rsi85_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if self.pairs[pair]['force_sell']: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Frc_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if profit > max(5, expected_profit) and baisse > 0.30: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if max_profit > 0.5 * count_of_buys and baisse > 0.15 and last_candle['sma12_state'] <= 0 and last_candle['sma60_state'] <= - 1: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if (last_candle['sma5_1h'] - before_last_candle_12['sma5_1h']) / last_candle['sma5_1h'] > 0.0002: return None factor = 1 if (self.getShortName(pair) == 'BTC'): factor = 0.5 # if baisse > 2 and baisse > factor * self.pairs[pair]['total_amount'] / 100: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return 'Baisse_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) # # if 1 <= count_of_buys <= 3: if last_candle['max_rsi_24'] > 75 and profit > expected_profit and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0: self.pairs[pair]['force_sell'] = False return str(count_of_buys) + '_' + 'Rsi75_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '') def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs from typing import List def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float: if pct <= thresholds[0]: return factors[0] if pct >= thresholds[-1]: return factors[-1] for i in range(1, len(thresholds)): if pct <= thresholds[i]: # interpolation linéaire entre thresholds[i-1] et thresholds[i] return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / ( thresholds[i] - thresholds[i - 1]) # Juste au cas où (devrait jamais arriver) return factors[-1] # def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30, # start_factor: float = 1.0, end_factor: float = 2.0) -> float: # if pct <= start_pct: # return start_factor # if pct >= end_pct: # return end_factor # # interpolation linéaire # return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct) def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] print(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( # int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' # round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = self.getDistMax(last_candle, pair) # if trade_type is not None: # if np.isnan(last_candle['rsi_1d']): # string = ' ' # else: # string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_deriv1_1d'])) # trade_type = trade_type \ # + " " + string \ # + " " + str(int(last_candle['rsi_1h'])) \ # + " " + str(int(last_candle['rsi_deriv1_1h'])) # val144 = self.getProbaHausse144(last_candle) # val1h = self.getProbaHausse1h(last_candle) val = self.getProbaHausseSma5d(last_candle) pct60 = round(100 * self.getPct60D(pair, last_candle), 2) color = GREEN if profit > 0 else RED color_sma24 = GREEN if last_candle['sma24_deriv1_1d'] > 0 else RED color_sma24_2 = GREEN if last_candle['sma24_deriv2_1d'] > 0 else RED color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1d'] > 0 else RED color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1d'] > 0 else RED color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # trend = last_candle['trend_class_1d'] # # indic_5m = self.getParamValue(pair, trend, 'buy', 'indic_5m') # indic_deriv1_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv1_5m') # indic_deriv2_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv2_5m') # # indic_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_5m_sell') # indic_deriv1_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv1_5m_sell') # indic_deriv2_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv2_5m_sell') self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" # f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_1h_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|" # f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_1h_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|" f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|" f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1_1d'], 2):>5}{RESET}" f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1_1d'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2_1d'], 2):>5}{RESET}" f"|{color_sma5_1h}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}" f"|{color_smooth_1h}{round(last_candle['mid_smooth_1h_deriv1'], 2):>5}{RESET}|{color_smooth2_1h}{round(last_candle['mid_smooth_1h_deriv2'], 2):>5}{RESET}" # f"|{last_candle['min60_1d']}|{last_candle['max60_1d']}" # f"|{last_candle['mid_smooth_tdc_5_1d'] or '-':>3}|{last_candle['mid_smooth_tdc_5_1h'] or '-':>3}|{last_candle['mid_smooth_tdc_5'] or '-':>3}" f"|{last_candle['mid_smooth_5_state_1d'] or '-':>3}|{last_candle['mid_smooth_24_state_1h'] or '-':>3}|{last_candle['mid_smooth_5_state_1h'] or '-':>3}|{last_candle['mid_smooth_5_state'] or '-':>3}" f"|{last_candle['trend_class_1d']:>5} {last_candle['trend_class_1h']:>5}" # {indic_5m} {indic_deriv1_5m} {indic_deriv2_5m} {indic_5m_sell} {indic_deriv1_5m_sell} {indic_deriv2_5m_sell}" ) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getDistMax(self, last_candle, pair): mx = last_candle['max12_1d'] dist_max = round(100 * (mx - last_candle['close']) / mx, 0) return dist_max def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def add_tendency_column(self, dataframe: pd.DataFrame, name: str, suffixe: str = '', eps: float = 1e-3, d1_lim_inf: float = -0.01, d1_lim_sup: float = 0.01) -> pd.DataFrame: """ Ajoute une colonne 'tendency' basée sur les dérivées 1 et 2 lissées et normalisées. eps permet de définir un seuil proche de zéro. suffixe permet de gérer plusieurs indicateurs. """ def tag_by_derivatives(row): d1 = row[f"{name}{suffixe}_deriv1"] d2 = row[f"{name}{suffixe}_deriv2"] # On considère les petites valeurs comme zéro if abs(d1) < eps: return 0 # Palier / neutre if d1 > d1_lim_sup: return 2 if d2 > eps else 1 # Acceleration Hausse / Ralentissement Hausse if d1 < d1_lim_inf: return -2 if d2 < -eps else -1 # Acceleration Baisse / Ralentissement Baisse if abs(d1) < eps: return 'DH' if d2 > eps else 'DB' # Depart Hausse / Depart Baisse return 'Mid' print(f"{name}_tdc{suffixe}") dataframe[f"{name}_tdc{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1) return dataframe # def add_tendency_column(self, dataframe: pd.DataFrame, name, suffixe='') -> pd.DataFrame: # def tag_by_derivatives(row): # d1 = row[f"{name}{suffixe}_deriv1"] # d2 = row[f"{name}{suffixe}_deriv2"] # d1_lim_inf = -0.01 # d1_lim_sup = 0.01 # if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup: # return 0 # Palier # if d1 == 0.0: # return 'DH' if d2 > 0 else 'DB' # Depart Hausse / Départ Baisse # if d1 > d1_lim_sup: # return 2 if d2 > 0 else 1 # Acceleration Hausse / Ralentissement Hausse # if d1 < d1_lim_inf: # return -2 if d2 < 0 else -1 # Accéleration Baisse / Ralentissement Baisse # return 'Mid' # # dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1) # return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] dataframe = self.populateDataframe(dataframe, timeframe='5m') ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") informative = self.populateDataframe(informative, timeframe='1h') dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative = self.populateDataframe(informative, timeframe='1d') dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price self.pairs[pair]['first_buy'] = buy.price self.pairs[pair]['first_amount'] = buy.price * buy.filled # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_buy'] = buy.price count = count + 1 amount += buy.price * buy.filled # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count # dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) self.pairs[pair]['total_amount'] = amount # dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24']) # =============================== # lissage des valeurs horaires dataframe['mid_smooth_1h'] = dataframe['mid'].rolling(window=6).mean() dataframe["mid_smooth_1h_deriv1"] = 100 * dataframe["mid_smooth_1h"].diff().rolling(window=6).mean() / \ dataframe['mid_smooth_1h'] dataframe["mid_smooth_1h_deriv2"] = 100 * dataframe["mid_smooth_1h_deriv1"].diff().rolling(window=6).mean() dataframe['mid_smooth_5h'] = talib.EMA(dataframe, timeperiod=60) # dataframe['mid'].rolling(window=60).mean() dataframe["mid_smooth_5h_deriv1"] = 100 * dataframe["mid_smooth_5h"].diff().rolling(window=60).mean() / \ dataframe['mid_smooth_5h'] dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean() # indic_5m_protect = self.indic_5m_slope_sup.value # indic_1h_protect = self.indic_1h_slope_sup.value + '_1h' # # dataframe['stop_buying_deb'] = ((dataframe['max_rsi_12_1d'] > self.rsi_buy_protect.value) | (dataframe['sma24_deriv1_1h'] < self.deriv1_buy_protect.value)) & (qtpylib.crossed_below(dataframe[indic_5m_protect], dataframe[indic_1h_protect])) # dataframe['stop_buying_end'] = (dataframe[indic_1h_protect].shift(24) > dataframe[indic_1h_protect].shift(12)) & (dataframe[indic_1h_protect].shift(12) < dataframe[indic_1h_protect]) # latched = np.zeros(len(dataframe), dtype=bool) # # for i in range(1, len(dataframe)): # if dataframe['stop_buying_deb'].iloc[i]: # latched[i] = True # elif dataframe['stop_buying_end'].iloc[i]: # latched[i] = False # else: # latched[i] = latched[i - 1] # # dataframe['stop_buying'] = latched dataframe["ms-10"] = dataframe["mid_smooth_24_deriv1"].shift(10) dataframe["ms-5"] = dataframe["mid_smooth_24_deriv1"].shift(5) dataframe["ms-4"] = dataframe["mid_smooth_24_deriv1"].shift(4) dataframe["ms-3"] = dataframe["mid_smooth_24_deriv1"].shift(3) dataframe["ms-2"] = dataframe["mid_smooth_24_deriv1"].shift(2) dataframe["ms-1"] = dataframe["mid_smooth_24_deriv1"].shift(1) dataframe["ms-0"] = dataframe["mid_smooth_24_deriv1"] # dataframe["ms+10"] = dataframe["mid_smooth_24"].shift(-11) if False and self.dp.runmode.value in ('backtest'): self.trainModel(dataframe, metadata) self.model = joblib.load('rf_model.pkl') # Préparer les features pour la prédiction features = dataframe[self.model_indicators].fillna(0) # Prédiction : probabilité que le prix monte # probs = self.model.predict_proba(features)[:, 1] probs = self.model.predict(features) # Sauvegarder la probabilité pour l’analyse dataframe['ml_prob'] = probs self.inspect_model(self.model) return dataframe def trainModel(self, dataframe: DataFrame, metadata: dict): pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) # # Étape 1 : sélectionner numériques # numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # # # Étape 2 : enlever constantes # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d") # and not c.endswith("_class") and not c.endswith("_price") # and not c.startswith('stop_buying'))] # # # Étape 3 : remplacer inf et NaN par 0 # dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) # # print("Colonnes utilisables pour le modèle :") # print(usable_cols) # # self.model_indicators = usable_cols df = dataframe[self.model_indicators].copy() # Corrélations des colonnes corr = df.corr(numeric_only=True) print("Corrélation des colonnes") print(corr) # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies # df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int) df['target'] = dataframe["mid_smooth_24_deriv1"].shift(-11) # > df['sma24'] * 1.003).astype(int) df['target'] = df['target'].fillna(0) #.astype(int) # Corrélations triées par importance avec une colonne cible target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False) print("Corrélations triées par importance avec une colonne cible") print(target_corr) # Corrélations triées par importance avec une colonne cible corr = df.corr(numeric_only=True) corr_unstacked = ( corr.unstack() .reset_index() .rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"}) ) # Supprimer les doublons col1/col2 inversés et soi-même corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]] # Trier par valeur absolue de corrélation corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index) print("Trier par valeur absolue de corrélation") print(corr_sorted.head(20)) # --- Calcul de la corrélation --- corr = df.corr(numeric_only=True) # évite les colonnes non numériques corr = corr * 100 # passage en pourcentage # --- Masque pour n’afficher que le triangle supérieur (optionnel) --- mask = np.triu(np.ones_like(corr, dtype=bool)) # --- Création de la figure --- fig, ax = plt.subplots(figsize=(20,12)) #96, 36)) # --- Heatmap avec un effet “température” --- sns.heatmap( corr, mask=mask, cmap="coolwarm", # palette bleu → rouge center=0, # 0 au centre annot=True, # affiche les valeurs dans chaque case fmt=".0f", # format entier (pas de décimale) cbar_kws={"label": "Corrélation (%)"}, # légende à droite linewidths=0.5, # petites lignes entre les cases ax=ax ) # --- Personnalisation --- ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20) plt.xticks(rotation=45, ha="right") plt.yticks(rotation=0) # --- Sauvegarde --- output_path = "/home/souti/freqtrade/user_data/plots/Matrice_de_correlation_temperature.png" plt.savefig(output_path, bbox_inches="tight", dpi=150) plt.close(fig) print(f"✅ Matrice enregistrée : {output_path}") # Nettoyage df = df.dropna() X = df[self.model_indicators] y = df['target'] # ta colonne cible binaire ou numérique print(self.feature_auc_scores(X, y)) # 4️⃣ Split train/test X = df[self.model_indicators] y = df['target'] # Séparation temporelle (train = 80 %, valid = 20 %) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # Nettoyage des valeurs invalides selector = VarianceThreshold(threshold=0.0001) selector.fit(X_train) selected = X_train.columns[selector.get_support()] print("Colonnes conservées :", list(selected)) # 1️⃣ Entraîne ton modèle LGBM normal # train_model = LGBMRegressor( # objective='regression', # metric='rmse', # tu peux aussi tester 'mae' # n_estimators=300, # learning_rate=0.05, # max_depth=7, # subsample=0.8, # colsample_bytree=0.8, # random_state=42 # ) # train_model.fit(X_train, y_train) train_model, selected_features = self.optuna(X_train, X_test, y_train, y_test) print("Features retenues :", list(selected_features)) # # 2️⃣ Sélection des features AVANT calibration # sfm = SelectFromModel(train_model, threshold="median", prefit=True) # selected_features = X_train.columns[sfm.get_support()] # print(selected_features) train_model.fit(X_train, y_train) # Importances importances = pd.DataFrame({ "feature": train_model.feature_name_, "importance": train_model.feature_importances_ }).sort_values("importance", ascending=False) print("\n===== 🔍 IMPORTANCE DES FEATURES =====") print(importances) # 6️⃣ Évaluer la précision (facultatif) preds = train_model.predict(X_test) mse = mean_squared_error(y_test, preds) rmse = np.sqrt(mse) r2 = r2_score(y_test, preds) print(f"RMSE: {rmse:.5f} | R²: {r2:.3f}") # acc = accuracy_score(y_test, preds) # print(f"Accuracy: {acc:.3f}") # 7️⃣ Sauvegarde du modèle joblib.dump(train_model, 'rf_model.pkl') print("✅ Modèle sauvegardé sous rf_model.pkl") self.analyze_model(train_model, X_train, X_test, y_train, y_test) def inspect_model(self, model): """ Affiche les informations d'un modèle ML déjà entraîné. Compatible avec scikit-learn, xgboost, lightgbm, catboost... """ print("===== 🔍 INFORMATIONS DU MODÈLE =====") # Type de modèle print(f"Type : {type(model).__name__}") print(f"Module : {model.__class__.__module__}") # Hyperparamètres if hasattr(model, "get_params"): params = model.get_params() print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====") for k, v in params.items(): print(f"{k}: {v}") # Nombre d’estimateurs if hasattr(model, "n_estimators"): print(f"\nNombre d’estimateurs : {model.n_estimators}") # Importance des features if hasattr(model, "feature_importances_"): print("\n===== 📊 IMPORTANCE DES FEATURES =====") # Correction ici : feature_names = getattr(model, "feature_names_in_", None) if isinstance(feature_names, np.ndarray): feature_names = feature_names.tolist() elif feature_names is None: feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))] fi = pd.DataFrame({ "feature": feature_names, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(fi) # Coefficients (modèles linéaires) if hasattr(model, "coef_"): print("\n===== ➗ COEFFICIENTS =====") coef = np.array(model.coef_) if coef.ndim == 1: for i, c in enumerate(coef): print(f"Feature {i}: {c:.6f}") else: print(coef) # Intercept if hasattr(model, "intercept_"): print("\nIntercept :", model.intercept_) # Classes connues if hasattr(model, "classes_"): print("\n===== 🎯 CLASSES =====") print(model.classes_) # Scores internes for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]: if hasattr(model, attr): print(f"\n{attr} = {getattr(model, attr)}") # Méthodes disponibles print("\n===== 🧩 MÉTHODES DISPONIBLES =====") methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)] print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else "")) print("\n===== ✅ FIN DE L’INSPECTION =====") def analyze_model(self, model, X_train, X_test, y_train, y_test): """ Analyse complète d'un modèle ML supervisé (classification binaire). Affiche performances, importance des features, matrices, seuils, etc. """ output_dir = "user_data/plots" os.makedirs(output_dir, exist_ok=True) # ---- Importance des features ---- if hasattr(model, "feature_importances_"): print("\n===== 🔍 IMPORTANCE DES FEATURES =====") importance = pd.DataFrame({ "feature": X_train.columns, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(importance) # Crée une figure plus grande fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces # Trace le bar plot sur cet axe importance.plot.bar(x="feature", y="importance", legend=False, ax=ax) # Tourner les labels pour plus de lisibilité ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right') plt.title("Importance des features") # plt.show() plt.savefig(os.path.join(output_dir, "Importance des features.png"), bbox_inches="tight") plt.close() # ---- Arbre de décision (extrait) ---- if hasattr(model, "estimators_"): print("\n===== 🌳 EXTRAIT D’UN ARBRE =====") print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800]) # --- Après l'entraînement du modèle --- preds = model.predict(X_test) # --- Évaluation --- mse = mean_squared_error(y_test, preds) rmse = np.sqrt(mse) r2 = r2_score(y_test, preds) print(f"RMSE: {rmse:.5f} | R²: {r2:.3f}") # --- Création du dossier de sortie --- plot_dir = "/home/souti/freqtrade/user_data/plots" os.makedirs(plot_dir, exist_ok=True) # --- Graphique prédiction vs réel --- plt.figure(figsize=(8, 8)) plt.scatter(y_test, preds, alpha=0.4, s=15) plt.xlabel("Valeurs réelles", fontsize=12) plt.ylabel("Valeurs prédites", fontsize=12) plt.title(f"LightGBM Régression — Prédiction vs Réel\nRMSE={rmse:.5f} | R²={r2:.3f}", fontsize=14) plt.plot( [y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', linewidth=1, label="Ligne idéale" ) plt.legend() # --- Sauvegarde --- plot_path = os.path.join(plot_dir, "LightGBM_regression_pred_vs_real.png") plt.savefig(plot_path, bbox_inches="tight", dpi=200) plt.close() print(f"✅ Graphique sauvegardé : {plot_path}") # save_dir = "/home/souti/freqtrade/user_data/plots/" # os.makedirs(save_dir, exist_ok=True) ax = lgb.plot_tree(model, tree_index=0, figsize=(30, 20), show_info=["split_gain", "internal_value", "internal_count"]) plt.title("Arbre de décision n°0") plt.savefig(os.path.join(plot_dir, "lgbm_tree_0.png"), bbox_inches="tight") plt.close() ax = lgb.plot_tree(model, figsize=(40, 20)) plt.title("Vue globale du modèle LGBM") plt.savefig(os.path.join(plot_dir, "lgbm_all_trees.png"), bbox_inches="tight") plt.close() # X_test = np.linspace(0, 10, 1000).reshape(-1, 1) y_pred = model.predict(X_test) self.graphFonctionApprise(X_test, y_test, y_pred) self.graphFonctionAppriseFeature(X_test, y_test, y_pred) # ============================================================================== ax = lgb.plot_importance(model, max_num_features=30, figsize=(12, 6)) plt.title("Importance des features - LGBM") plt.savefig(os.path.join(plot_dir, "lgbm_feature_importance.png"), bbox_inches="tight") plt.close() corr = X_train.corr() * 100 # en pourcentage plt.figure(figsize=(20, 16)) sns.heatmap(corr, cmap="coolwarm", center=0, annot=False, fmt=".1f", cbar_kws={'label': 'Corrélation (%)'}) plt.title("Matrice de corrélation (%)") plt.savefig(os.path.join(plot_dir, "correlation_matrix.png"), bbox_inches="tight") plt.close() plt.figure(figsize=(10, 6)) plt.scatter(y_test, model.predict(X_test), alpha=0.5) plt.xlabel("Valeurs réelles") plt.ylabel("Prédictions du modèle") plt.title("Comparaison y_test vs y_pred") plt.savefig(os.path.join(plot_dir, "ytest_vs_ypred.png"), bbox_inches="tight") plt.close() print("\n===== ✅ FIN DE L’ANALYSE =====") def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None): """ Affiche la précision, le rappel et le F1-score selon le seuil de décision. y_true : labels réels (0 ou 1) y_proba : probabilités prédites (P(hausse)) step : pas entre les seuils testés save_path : si renseigné, enregistre l'image au lieu d'afficher """ # Le graphique généré affichera trois courbes : # # 🔵 Precision — la fiabilité de tes signaux haussiers. # # 🟢 Recall — la proportion de hausses que ton modèle détecte. # # 🟣 F1-score — le compromis optimal entre les deux. thresholds = np.arange(0, 1.01, step) precisions, recalls, f1s = [], [], [] for thr in thresholds: preds = (y_proba >= thr).astype(int) precisions.append(precision_score(y_true, preds)) recalls.append(recall_score(y_true, preds)) f1s.append(f1_score(y_true, preds)) plt.figure(figsize=(10, 6)) plt.plot(thresholds, precisions, label="Precision", linewidth=2) plt.plot(thresholds, recalls, label="Recall", linewidth=2) plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--") plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5") plt.title("📊 Performance selon le seuil de probabilité", fontsize=14) plt.xlabel("Seuil de décision (threshold)") plt.ylabel("Score") plt.legend() plt.grid(True, alpha=0.3) if save_path: plt.savefig(save_path, bbox_inches='tight') print(f"✅ Graphique enregistré : {save_path}") else: plt.show() # # ============================= # # Exemple d’utilisation : # # ============================= # if __name__ == "__main__": # # Exemple : chargement d’un modèle et test # import joblib # # model = joblib.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/model.pkl") # data = np.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/test_data.npz") # X_test, y_test = data["X"], data["y"] # # y_proba = model.predict_proba(X_test)[:, 1] # # # Trace ou enregistre le graphique # plot_threshold_analysis(y_test, y_proba, step=0.05, # save_path="/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/threshold_analysis.png") def populateDataframe(self, dataframe, timeframe='5m'): heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe["percent"] = dataframe['close'].pct_change() dataframe["percent3"] = dataframe['close'].pct_change(3) dataframe["percent12"] = dataframe['close'].pct_change(12) dataframe["percent24"] = dataframe['close'].pct_change(24) # if self.dp.runmode.value in ('backtest'): # dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close'] dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean() self.calculeDerivees(dataframe, 'sma5', timeframe=timeframe, ema_period=5) dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() #dataframe["mid"].rolling(window=12).mean() self.calculeDerivees(dataframe, 'sma12', timeframe=timeframe, ema_period=12) dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() #dataframe["mid"].rolling(window=24).mean() self.calculeDerivees(dataframe, 'sma24', timeframe=timeframe, ema_period=24) dataframe['sma48'] = dataframe['mid'].ewm(span=48, adjust=False).mean() #dataframe["mid"].rolling(window=48).mean() self.calculeDerivees(dataframe, 'sma48', timeframe=timeframe, ema_period=48) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() #dataframe["mid"].rolling(window=60).mean() self.calculeDerivees(dataframe, 'sma60', timeframe=timeframe, ema_period=60) dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", timeframe=timeframe) # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) self.calculeDerivees(dataframe, 'rsi', timeframe=timeframe, ema_period=12) dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12) dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60) dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60) dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['close']) / dataframe['min60']) # dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36) # dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36) # dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36'] # dataframe['maxpct36'] = talib.MAX(dataframe['pct36'], timeperiod=36) # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma5"] # dataframe["bb_width"] = ( # (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"] # ) # Calcul MACD macd, macdsignal, macdhist = talib.MACD( dataframe['close'], fastperiod=12, slowperiod=26, signalperiod=9 ) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le dataframe dataframe['macd'] = macd dataframe['macdsignal'] = macdsignal dataframe['macdhist'] = macdhist # Regarde dans le futur # # --- Rendre relatif sur chaque série (-1 → 1) --- # for col in ['macd', 'macdsignal', 'macdhist']: # series = dataframe[col] # valid = series[~np.isnan(series)] # ignorer NaN # min_val = valid.min() # max_val = valid.max() # span = max_val - min_val if max_val != min_val else 1 # dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1 # # dataframe['tdc_macd'] = self.macd_tendance_int( # dataframe, # macd_col='macd_rel', # signal_col='macdsignal_rel', # hist_col='macdhist_rel' # ) # ------------------------------------------------------------------------------------ # rolling SMA indicators (used for trend detection too) s_short = self.DEFAULT_PARAMS['sma_short'] s_long = self.DEFAULT_PARAMS['sma_long'] dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean() dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean() # --- pente brute --- dataframe['slope'] = dataframe['sma24'].diff() # --- lissage EMA --- dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean() # # RSI # window = 14 # delta = dataframe['close'].diff() # up = delta.clip(lower=0) # down = -1 * delta.clip(upper=0) # ma_up = up.rolling(window=window).mean() # ma_down = down.rolling(window=window).mean() # rs = ma_up / ma_down.replace(0, 1e-9) # dataframe['rsi'] = 100 - (100 / (1 + rs)) # # # EMA example # dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean() # # # ATR (simple implementation) # high_low = dataframe['high'] - dataframe['low'] # high_close = (dataframe['high'] - dataframe['close'].shift()).abs() # low_close = (dataframe['low'] - dataframe['close'].shift()).abs() # tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1) # dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean() ########################### # df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume # Assure-toi qu'il est trié par date croissante # --- Volatilité normalisée --- dataframe['atr'] = ta.volatility.AverageTrueRange( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).average_true_range() dataframe['atr_norm'] = dataframe['atr'] / dataframe['close'] # --- Force de tendance --- dataframe['adx'] = ta.trend.ADXIndicator( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).adx() # --- Volume directionnel (On Balance Volume) --- dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator( close=dataframe['close'], volume=dataframe['volume'] ).on_balance_volume() # --- Volatilité récente (écart-type des rendements) --- dataframe['vol_24'] = dataframe['percent'].rolling(24).std() # Compter les baisses / hausses consécutives self.calculateDownAndUp(dataframe, limit=0.0001) # df : ton dataframe OHLCV + indicateurs existants # Assurez-vous que les colonnes suivantes existent : # 'max_rsi_12', 'roc_24', 'bb_percent_1h' # --- Filtrage des NaN initiaux --- # dataframe = dataframe.dropna() dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width'] dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3) dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9) dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0 ########################### dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean()) self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12) self.setTrends(dataframe) return dataframe def feature_auc_scores(self, X, y): aucs = {} for col in X.columns: try: aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0)) except Exception: aucs[col] = np.nan return pd.Series(aucs).sort_values(ascending=False) def macd_tendance_int(self, dataframe: pd.DataFrame, macd_col='macd', signal_col='macdsignal', hist_col='macdhist', eps=0.0) -> pd.Series: """ Renvoie la tendance MACD sous forme d'entiers. 2 : Haussier 1 : Ralentissement hausse 0 : Neutre -1 : Ralentissement baisse -2 : Baissier """ # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # | Situation | MACD | Signal | Hist | Interprétation | # | -------------------------- | ---------- | --------- | -------- | ------------------------------------------ | # | MACD > 0, Hist croissant | au-dessus | croissant | Haussier | Momentum fort → tendance haussière | # | MACD > 0, Hist décroissant | au-dessus | en baisse | Momentum | La hausse ralentit, prudence | # | MACD < 0, Hist décroissant | en dessous | en baisse | Baissier | Momentum fort → tendance baissière | # | MACD < 0, Hist croissant | en dessous | en hausse | Rebond ? | La baisse ralentit → possible retournement | # Créer une série de 0 par défaut tendance = pd.Series(0, index=dataframe.index) # Cas MACD > signal mask_up = dataframe[macd_col] > dataframe[signal_col] + eps mask_up_hist_pos = mask_up & (dataframe[hist_col] > 0) mask_up_hist_neg = mask_up & (dataframe[hist_col] <= 0) tendance[mask_up_hist_pos] = 2 # Haussier tendance[mask_up_hist_neg] = 1 # Ralentissement hausse # Cas MACD < signal mask_down = dataframe[macd_col] < dataframe[signal_col] - eps mask_down_hist_neg = mask_down & (dataframe[hist_col] < 0) mask_down_hist_pos = mask_down & (dataframe[hist_col] >= 0) tendance[mask_down_hist_neg] = -2 # Baissier tendance[mask_down_hist_pos] = -1 # Ralentissement baisse # Les NaN deviennent neutre tendance[dataframe[[macd_col, signal_col, hist_col]].isna().any(axis=1)] = 0 return tendance def calculateDownAndUp(self, dataframe, limit=0.0001): dataframe['down'] = dataframe['hapercent'] <= limit dataframe['up'] = dataframe['hapercent'] >= limit dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'): dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean() dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", timeframe=timeframe, ema_period=window) return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '5m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" # d1s_col = f"{name}{suffixe}_deriv1_smooth" # d2s_col = f"{name}{suffixe}_deriv2_smooth" tendency_col = f"{name}{suffixe}_state" factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \ & (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"]) # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"] # dérivée relative simple dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) # lissage EMA dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean() # dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median() dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1) dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean() # epsilon adaptatif via rolling percentile p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95) p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95) eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef # fallback global eps global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1) eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2) # if verbose and self.dp.runmode.value in ('backtest'): # stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T # stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0) # print(f"---- Derivatives stats {timeframe}----") # print(stats) # print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}") # print("---------------------------") # mapping tendency def tag_by_derivatives(row): idx = int(row.name) d1v = float(row[d1_col]) d2v = float(row[d2_col]) eps1 = float(eps_d1_series.iloc[idx]) eps2 = float(eps_d2_series.iloc[idx]) # # mapping état → codes 3 lettres explicites # # | Ancien état | Nouveau code 3 lettres | Interprétation | # # | ----------- | ---------------------- | --------------------- | # # | 4 | HAU | Hausse Accélérée | # # | 3 | HSR | Hausse Ralentissement | # # | 2 | HST | Hausse Stable | # # | 1 | DHB | Départ Hausse | # # | 0 | PAL | Palier / neutre | # # | -1 | DBD | Départ Baisse | # # | -2 | BSR | Baisse Ralentissement | # # | -3 | BST | Baisse Stable | # # | -4 | BAS | Baisse Accélérée | # Palier strict if abs(d1v) <= eps1 and abs(d2v) <= eps2: return 0 # Départ si d1 ~ 0 mais d2 signale direction if abs(d1v) <= eps1: return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0 # Hausse if d1v > eps1: return 4 if d2v > eps2 else 3 # Baisse if d1v < -eps1: return -4 if d2v < -eps2 else -2 return 0 dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1) # if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'): # print("##################") # print(f"# STAT {timeframe} {name}{suffixe}") # print("##################") # self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2") return dataframe def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe.loc[ ( (dataframe['ml_prob'] > self.ml_prob_buy.value) ), ['enter_long', 'enter_tag']] = (1, f"ml_prob") dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) if self.dp.runmode.value in ('backtest'): dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather") return dataframe def calculateProbabilite2Index(self, df, futur_cols, indic_1, indic_2): # # Définition des tranches pour les dérivées # bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf] # labels = ['forte baisse', 'légère baisse', 'neutre', 'légère hausse', 'forte hausse'] # # # Ajout des colonnes bin (catégorisation) # df[f"{indic_1}_bin"] = pd.cut(df['mid_smooth_1h_deriv1'], bins=bins_deriv, labels=labels) # df[f"{indic_2}_bin"] = pd.cut(df['mid_smooth_deriv1_1d'], bins=bins_deriv, labels=labels) # # # Colonnes de prix futur à analyser # futur_cols = ['futur_percent_1h', 'futur_percent_2h', 'futur_percent_3h', 'futur_percent_4h', 'futur_percent_5h'] # # # Calcul des moyennes et des effectifs # grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"])[futur_cols].agg(['mean', 'count']) # # pd.set_option('display.width', 200) # largeur max affichage # pd.set_option('display.max_columns', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', 300) # largeur max affichage # nettoyage # series = df[f"{indic_2}"].dropna() # unique_vals = df[f"{indic_2}"].nunique() # print(unique_vals) # print(df[f"{indic_2}"]) n = len(self.labels) df[f"{indic_1}_bin"], bins_1h = pd.qcut(df[f"{indic_1}"], q=n, labels=self.labels, retbins=True, duplicates='drop') df[f"{indic_2}_bin"], bins_1d = pd.qcut(df[f"{indic_2}"], q=n, labels=self.labels, retbins=True, duplicates='drop') # Affichage formaté pour code Python print(f"Bornes des quantiles pour {indic_1} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]") print(f"Bornes des quantiles pour {indic_2} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]") # Agrégation grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[futur_cols].agg(['mean', 'count']) # Affichage with pd.option_context('display.max_rows', None, 'display.max_columns', None): print(grouped.round(4)) # Ajout des probabilités de hausse for col in futur_cols: df[f"{col}_is_up"] = df[col] > 0 # Calcul de la proba de hausse proba_up = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[f"{col}_is_up"].mean().unstack() print(f"\nProbabilité de hausse pour {col} (en %):") with pd.option_context('display.max_rows', None, 'display.max_columns', None): print((proba_up * 100).round(1)) # Affichage formaté des valeurs comme tableau Python with pd.option_context('display.max_rows', None, 'display.max_columns', None): df_formatted = (proba_up * 100).round(1) print("data = {") for index, row in df_formatted.iterrows(): row_values = ", ".join([f"{val:.1f}" for val in row]) print(f"'{index}': [{row_values}], ") print("}") data = {} for index, row in df_formatted.iterrows(): # on convertit proprement avec arrondi comme dans ton print, mais en données réelles data[index] = [ None if (isinstance(val, float) and math.isnan(val)) else val for val in row ] # Niveaux unicode pour les barres verticales (style sparkline) # spark_chars = "▁▂▃▄▅▆▇█" # print(data.values()) # # Collecte globale min/max # all_values = [] # for vals in data.values(): # all_values.extend(v for v in vals if not (isinstance(v, float) and math.isnan(v))) # # global_min = min(all_values) if all_values else 0 # global_max = max(all_values) if all_values else 1 # global_span = (global_max - global_min) if global_max != global_min else 1 # # def sparkline_global(values): # if all(isinstance(v, float) and math.isnan(v) for v in values): # return "(no data)" # out = "" # for v in values: # if isinstance(v, float) and math.isnan(v): # out += " " # else: # idx = int((v - global_min) / global_span * (len(spark_chars) - 1)) # out += spark_chars[idx] # return out # # for key, values in data.items(): # print(f"{key:>3} : {sparkline_global(values)}") # Palette ANSI 256 couleurs pour heatmap def get_ansi_color(val): """ Échelle fixe 0→100 : 0-20 : bleu (21) 20-40 : cyan (51) 40-60 : vert/jaune (46 / 226) 60-80 : orange (208) 80-100 : rouge (196) """ if val is None: return "" if val < 0: val = 0 elif val > 100: val = 100 if val <= 20: code = 21 elif val <= 40: code = 51 elif val <= 60: code = 226 elif val <= 80: code = 208 else: code = 196 return f"\033[38;5;{code}m" RESET = "\033[0m" # Affichage columns = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] header = " " + " ".join([f"{col:>6}" for col in columns]) print(header) print("-" * len(header)) for key, values in data.items(): line = f"{key:>3} |" for v in values: if v is None: line += f" {' '} " # vide pour NaN / None else: color = get_ansi_color(v) line += f" {color}{v:5.1f}{RESET} " print(line) def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe.loc[ ( (dataframe['ml_prob'] < self.ml_prob_sell.value) ), ['exit_long', 'exit_tag']] = (1, f"ml_prob") return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 0): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) last_lost = self.getLastLost(last_candle, pair) pct_first = 0 total_counts = sum( pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC') if self.pairs[pair]['first_buy']: pct_first = self.getPctFirstBuy(pair, last_candle) pct = self.pct.value if count_of_buys == 1: pct_max = current_profit else: if self.pairs[trade.pair]['last_buy']: pct_max = self.getPctLastBuy(pair, last_candle) else: pct_max = - pct if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2: lim = - pct - (count_of_buys * self.pct_inc.value) # lim = self.getLimitBuy(pair, last_candle, pct) # lim = - (0.012 * (1 + round(count_of_buys / 5)) + 0.001 * (count_of_buys - 1)) # lim = - (0.012 + 0.001 * (count_of_buys - 1) + (0.002 * count_of_buys if count_of_buys > 10 else 0.001 * count_of_buys if count_of_buys > 5 else 0)) else: pct = 0.05 lim = - pct - (count_of_buys * 0.0025) # lim = self.getLimitBuy(pair, last_candle, pct) if (len(dataframe) < 1): # print("skip dataframe") return None if not self.should_enter_trade(pair, last_candle, current_time): return None condition = (last_candle['enter_long'] and last_candle['sma60_deriv1'] > 0 and last_candle['hapercent'] > 0) \ or last_candle['enter_tag'] == 'pct3' \ or last_candle['enter_tag'] == 'pct3_1h' # if (self.getShortName(pair) != 'BTC' and count_of_buys > 3): # condition = before_last_candle_24['mid_smooth_3_1h'] > before_last_candle_12['mid_smooth_3_1h'] and before_last_candle_12['mid_smooth_3_1h'] < last_candle['mid_smooth_3_1h'] #and last_candle['mid_smooth_3_deriv1_1h'] < -1.5 limit_buy = 40 if (count_of_buys < limit_buy) and condition and (pct_max < lim): try: if self.pairs[pair]['has_gain'] and profit > 0: self.pairs[pair]['force_sell'] = True return None # if 6 <= count_of_buys: # if not ((before_last_candle_24['sma24_deriv1_1h'] > before_last_candle_12['sma24_deriv1_1h']) # & (before_last_candle_12['sma24_deriv1_1h'] < last_candle['sma24_deriv1_1h'])): # return None # print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_first={pct_first:.3f} pct_max={pct_max:.3f} lim={lim:.3f} index={index}") # self.pairs[trade.pair]['last_palier_index'] = index # # Appel de la fonction # poly_func, x_future, y_future, count = self.polynomial_forecast( # dataframe['mid_smooth_12'], # window=self.buy_horizon_predict_1h.value * 12, # degree=4) # # if count < 3: # return None max_amount = self.config.get('stake_amount') * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle) * abs(last_lost / self.mise_factor_buy.value)) if stake_amount > 0: trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # print(df_filtered) return stake_amount return None except Exception as exception: print(exception) return None if (profit > self.pairs[pair]['previous_profit'] and profit > self.pairs[pair]['expected_profit'] and hours > 6 # and last_candle['sma60_deriv1'] > 0 and last_candle['max_rsi_12_1h'] < 75 and last_candle['rsi_1d'] < 58 # and last_candle['stop_buying'] == False # and last_candle['mid_smooth_5_deriv1_1d'] > 0 and self.wallets.get_available_stake_amount() > 0 ): try: self.pairs[pair]['previous_profit'] = profit stake_amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=str(round(pct_max, 4)), profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None except Exception as exception: print(exception) return None return None def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def getPct60D(self, pair, last_candle): return round((last_candle['max60_1d'] - last_candle['min60_1d']) / last_candle['max60_1d'], 4) def getPctClose60D(self, pair, last_candle): if last_candle['close'] > last_candle['max12_1d']: return 1 if last_candle['close'] < last_candle['min12_1d']: return 0 return round( (last_candle['close'] - last_candle['min12_1d']) / (last_candle['max12_1d'] - last_candle['min12_1d']), 4) def getLimitBuy(self, pair, last_candle, first_pct): count_of_buys = self.pairs[pair]['count_of_buys'] pct60 = self.getPct60D(pair, last_candle) # exemple 0.3 pour 30% if (pct60 < 0.05): lim = - first_pct - (count_of_buys * 0.001 * 0.05 / 0.05) else: # 0.1 # 0.4 lim = - first_pct - (count_of_buys * 0.001 * pct60 / 0.05) return lim # def getProbaHausseEmaVolume(self, last_candle): # value_1 = self.getValuesFromTable(self.ema_volume, last_candle['ema_volume']) # value_2 = self.getValuesFromTable(self.mid_smooth_1h_deriv1, last_candle['mid_smooth_1h_deriv1']) # # val = self.approx_val_from_bins( # matrice=self.ema_volume_mid_smooth_1h_deriv1_matrice_df, # numeric_matrice=self.ema_volume_mid_smooth_1h_deriv1_numeric_matrice, # row_label=value_2, # col_label=value_1 # ) # return val def getProbaHausseSma5d(self, last_candle): value_1 = self.getValuesFromTable(self.sma5_deriv1, last_candle['sma5_deriv1_1d']) value_2 = self.getValuesFromTable(self.sma5_deriv2, last_candle['sma5_deriv2_1d']) # print(f"{last_candle['sma5_deriv1_1d']} => {value_1} / {last_candle['sma5_deriv2_1d']} => {value_2}") val = self.approx_val_from_bins( matrice=self.sma5_derive1_2_matrice_df, numeric_matrice=self.sma5_derive1_2_numeric_matrice, row_label=value_2, col_label=value_1 ) return val def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # Calculer le minimum des 14 derniers jours nb_pairs = len(self.dp.current_whitelist()) base_stake_amount = self.config.get('stake_amount') / (self.mises.value) # * nb_pairs) # Montant de base configuré # pct60 = round(100 * self.getPctClose60D(pair, last_candle), 2) if True: # not pair in ('BTC/USDT', 'BTC/USDC'): # factors = [1, 1.2, 1.3, 1.4] if self.pairs[pair]['count_of_buys'] == 0: # pctClose60 = self.getPctClose60D(pair, last_candle) # dist_max = self.getDistMax(last_candle, pair) # factor = self.multi_step_interpolate(dist_max, self.thresholds, self.factors) factor = 1 #65 / min(65, last_candle['rsi_1d']) if last_candle['slope_norm_1d'] < last_candle['slope_norm_1h']: factor = 2 adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) else: adjusted_stake_amount = self.pairs[pair]['first_amount'] else: first_price = self.pairs[pair]['first_buy'] if (first_price == 0): first_price = last_candle['close'] last_max = last_candle['max12_1d'] pct = 5 if last_max > 0: pct = 100 * (last_max - first_price) / last_max factor = self.multi_step_interpolate(pct, self.thresholds, self.factors) adjusted_stake_amount = base_stake_amount * factor # max(base_stake_amount, min(100, base_stake_amount * percent_4)) # pct = 100 * abs(self.getPctFirstBuy(pair, last_candle)) # # factor = self.multi_step_interpolate(pct, self.thresholds, self.factors) if self.pairs[pair]['count_of_buys'] == 0: self.pairs[pair]['first_amount'] = adjusted_stake_amount return adjusted_stake_amount def calculateAmountSliding(self, pair, last_candle): val = last_candle['close'] min_sliding = min(last_candle['min60_1d'], val) max_sliding = max(last_candle['max60_1d'], val) min_abs = self.pairs[pair]['last_min'] max_abs = self.pairs[pair]['last_max'] full = self.wallets.get_total_stake_amount() stake = full / self.stakes out_min = stake / 2 out_max = stake * 2 # Clamp sliding range within absolute bounds min_sliding = max(min_sliding, min_abs) max_sliding = min(max_sliding, max_abs) # Avoid division by zero if max_sliding == min_sliding: return out_max # Or midpoint, or default value # Inverse linear interpolation position = (val - min_sliding) / (max_sliding - min_sliding) return out_max - position * (out_max - out_min) def calculatePctSliding(self, pair, last_candle): val = last_candle['close'] min_sliding = last_candle['min60_1d'] max_sliding = last_candle['max60_1d'] min_abs = self.pairs[pair]['last_min'] max_abs = self.pairs[pair]['last_max'] out_min = 0.025 out_max = 0.08 # Clamp sliding range within absolute bounds min_sliding = max(min_sliding, min_abs) max_sliding = min(max_sliding, max_abs) # Avoid division by zero if max_sliding == min_sliding: return out_max # Or midpoint, or default value # Inverse linear interpolation position = (val - min_sliding) / (max_sliding - min_sliding) return out_max - position * (out_max - out_min) def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if (self.getShortName(pair) == 'BTC'): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] # if self.pairs[pair]['count_of_buys'] > 6: # pct_to_max = 0.006 * self.pairs[pair]['count_of_buys'] # pctClose60 = self.getPctClose60D(pair, last_candle) # max_60 = last_candle['max60_1d'] # if last_candle['close'] < max_60: # pct_to_max = 0.25 * (max_60 - last_candle['close']) / max_60 # pct_to_max = pct_to_max * (2 - pctClose60) expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit # print( # f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}") return expected_profit def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values # ✅ Première dérivée(variation ou pente) # Positive: la courbe est croissante → tendance haussière. # Négative: la courbe est décroissante → tendance baissière. # Proche de 0: la courbe est plate → marché stable ou en transition. # # Applications: # Détecter les points d’inflexion(changement de tendance) quand elle s’annule.\ # Analyser la vitesse d’un mouvement(plus elle est forte, plus le mouvement est impulsif). # # ✅ Seconde dérivée(accélération ou concavité) # Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse. # Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse. # Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements. # # Exemples: # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # # Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0. # Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe. def calculateRegression(self, dataframe: DataFrame, column='close', window=50, degree=3, future_offset: int = 10 # projection à n bougies après ) -> DataFrame: df = dataframe.copy() regression_fit = [] regression_future_fit = [] regression_fit = [] regression_future_fit = [] for i in range(len(df)): if i < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # Fin de la fenêtre d’apprentissage end_index = i start_index = i - window y = df[column].iloc[start_index:end_index].values # Si les données sont insuffisantes (juste par précaution) if len(y) < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # x centré pour meilleure stabilité numérique x = np.linspace(-1, 1, window) coeffs = np.polyfit(x, y, degree) poly = np.poly1d(coeffs) # Calcul point présent (dernier de la fenêtre) x_now = x[-1] regression_fit.append(poly(x_now)) # Calcul point futur, en ajustant si on dépasse la fin remaining = len(df) - i - 1 effective_offset = min(future_offset, remaining) x_future = x_now + (effective_offset / window) * 2 # respect du même pas regression_future_fit.append(poly(x_future)) df[f"{column}_regression"] = regression_fit # 2. Dérivée première = différence entre deux bougies successives df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"], 4) # 3. Dérivée seconde = différence de la dérivée première df[f"{column}_regression_deriv2"] = round(10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4) df[f"{column}_future_{future_offset}"] = regression_future_fit # # 2. Dérivée première = différence entre deux bougies successives # df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4) # # # 3. Dérivée seconde = différence de la dérivée première # df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4) return df def getValuesFromTable(self, values, value): for i in range(len(values) - 1): if values[i] <= value < values[i + 1]: return self.labels[i] return self.labels[-1] # cas limite pour la borne max # def interpolated_val_from_bins(self, row_pos, col_pos): # """ # Renvoie une approximation interpolée (bilinéaire) d'une valeur dans la matrice # à partir de positions flottantes dans l'index (ligne) et les colonnes. # # Parameters: # matrix_df (pd.DataFrame): Matrice des probabilités (index/colonnes = labels). # row_pos (float): Position réelle de la ligne (0 = B5, 10 = H5). # col_pos (float): Position réelle de la colonne (0 = B5, 10 = H5). # # Returns: # float: Valeur interpolée, ou NaN si en dehors des bornes. # """ # # # Labels ordonnés # n = len(self.labels) # # # Vérification des limites # if not (0 <= row_pos <= n - 1) or not (0 <= col_pos <= n - 1): # return np.nan # # # Conversion des labels -> matrice # matrix = self.smooth_smadiff_matrice_df.reindex(index=self.labels, columns=self.labels).values # # # Coordonnées entières (inférieures) # i = int(np.floor(row_pos)) # j = int(np.floor(col_pos)) # # # Coefficients pour interpolation # dx = row_pos - i # dy = col_pos - j # # # Précautions sur les bords # if i >= n - 1: i = n - 2; dx = 1.0 # if j >= n - 1: j = n - 2; dy = 1.0 # # # Récupération des 4 valeurs voisines # v00 = matrix[i][j] # v10 = matrix[i + 1][j] # v01 = matrix[i][j + 1] # v11 = matrix[i + 1][j + 1] # # # Interpolation bilinéaire # interpolated = ( # (1 - dx) * (1 - dy) * v00 + # dx * (1 - dy) * v10 + # (1 - dx) * dy * v01 + # dx * dy * v11 # ) # return interpolated def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label): """ Renvoie une approximation de la valeur à partir des labels binaires (e.g. B5, H1) en utilisant une interpolation simple basée sur les indices. Parameters: matrix_df (pd.DataFrame): Matrice avec les labels binaires en index et colonnes. row_label (str): Label de la ligne (ex: 'B3'). col_label (str): Label de la colonne (ex: 'H2'). Returns: float: Valeur approchée si possible, sinon NaN. """ # Vérification des labels if row_label not in matrice.index or col_label not in matrice.columns: return np.nan # Index correspondant row_idx = self.label_to_index.get(row_label) col_idx = self.label_to_index.get(col_label) # Approximation directe (aucune interpolation complexe ici, juste une lecture) return numeric_matrice[row_idx, col_idx] @property def protections(self): return [ { "method": "CooldownPeriod", "stop_duration_candles": 12 } # { # "method": "MaxDrawdown", # "lookback_period_candles": self.lookback.value, # "trade_limit": self.trade_limit.value, # "stop_duration_candles": self.protection_stop.value, # "max_allowed_drawdown": self.protection_max_allowed_dd.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": self.protection_stoploss_stop.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] def conditional_smoothing(self, series, threshold=0.002): smoothed = [series.iloc[0]] for val in series.iloc[1:]: last = smoothed[-1] if abs(val - last) / last >= threshold: smoothed.append(val) else: smoothed.append(last) return pd.Series(smoothed, index=series.index) def causal_savgol(self, series, window=25, polyorder=3): result = [] half_window = window # Fenêtre complète dans le passé for i in range(len(series)): if i < half_window: result.append(np.nan) continue window_series = series[i - half_window:i] if window_series.isna().any(): result.append(np.nan) continue coeffs = np.polyfit(range(window), window_series, polyorder) poly = np.poly1d(coeffs) result.append(poly(window - 1)) return pd.Series(result, index=series.index) def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15, max_stake: float = 1000.0) -> float: """ Calcule la mise à allouer en fonction du drawdown. :param pct: Drawdown en pourcentage (ex: -0.12 pour -12%) :param base_stake: Mise de base (niveau 0) :param step: Espacement entre paliers (ex: tous les -4%) :param growth: Facteur de croissance par palier (ex: 1.15 pour +15%) :param max_stake: Mise maximale à ne pas dépasser :return: Montant à miser """ if pct >= 0: return base_stake level = int(abs(pct) / step) stake = base_stake * (growth ** level) return min(stake, max_stake) def compute_adaptive_paliers(self, max_drawdown: float = 0.65, first_steps: list[float] = [0.01, 0.01, 0.015, 0.02], growth: float = 1.2) -> list[float]: """ Génère une liste de drawdowns négatifs avec des paliers plus rapprochés au début. :param max_drawdown: Drawdown max (ex: 0.65 pour -65%) :param first_steps: Liste des premiers paliers fixes en % (ex: [0.01, 0.01, 0.015]) :param growth: Facteur multiplicatif pour espacer les paliers suivants :return: Liste de drawdowns négatifs (croissants) """ paliers = [] cumulated = 0.0 # Étapes initiales rapprochées for step in first_steps: cumulated += step paliers.append(round(-cumulated, 4)) # Étapes suivantes plus espacées step = first_steps[-1] while cumulated < max_drawdown: step *= growth cumulated += step if cumulated >= max_drawdown: break paliers.append(round(-cumulated, 4)) return paliers # def get_dca_stakes(self, # max_drawdown: float = 0.65, # base_stake: float = 100.0, # first_steps: list[float] = [0.01, 0.01, 0.015, 0.015], # growth: float = 1.2, # stake_growth: float = 1.15 # ) -> list[tuple[float, float]]: # """ # Génère les paliers de drawdown et leurs stakes associés. # # :param max_drawdown: Maximum drawdown (ex: 0.65 pour -65%) # :param base_stake: Mise initiale # :param first_steps: Paliers de départ (plus resserrés) # :param growth: Multiplicateur d'espacement des paliers # :param stake_growth: Croissance multiplicative des mises # :return: Liste de tuples (palier_pct, stake) # [(-0.01, 100.0), (-0.02, 115.0), (-0.035, 132.25), (-0.05, 152.09), (-0.068, 174.9), # (-0.0896, 201.14), (-0.1155, 231.31), (-0.1466, 266.0), (-0.1839, 305.9), (-0.2287, 351.79), # (-0.2825, 404.56), (-0.347, 465.24), (-0.4244, 535.03), (-0.5173, 615.28), (-0.6287, 707.57)] # """ # paliers = [ # (-0.01, 100.0), (-0.02, 115.0), (-0.035, 130), (-0.05, 150), (-0.07, 150), # (-0.10, 150), (-0.15, 150), (-0.20, 150), (-0.25, 150), # (-0.30, 200), (-0.40, 200), # (-0.50, 300), (-0.60, 400), (-0.70, 500), (-0.80, 1000) # ] # # # cumulated = 0.0 # # stake = base_stake # # # # # Étapes initiales # # for step in first_steps: # # cumulated += step # # paliers.append((round(-cumulated, 4), round(stake, 2))) # # stake *= stake_growth # # # # # Étapes suivantes # # step = first_steps[-1] # # while cumulated < max_drawdown: # # step *= growth # # cumulated += step # # if cumulated >= max_drawdown: # # break # # paliers.append((round(-cumulated, 4), round(stake, 2))) # # stake *= stake_growth # # return paliers # def get_active_stake(self, pct: float) -> float: # """ # Renvoie la mise correspondant au drawdown `pct`. # # :param pct: drawdown courant (négatif, ex: -0.043) # :param paliers: liste de tuples (drawdown, stake) # :return: stake correspondant # """ # abs_pct = abs(pct) # stake = self.paliers[0][1] # stake par défaut # # for palier, s in self.paliers: # if abs_pct >= abs(palier): # stake = s # else: # break # # return stake # def get_palier_index(self, pct): # """ # Retourne l'index du palier franchi pour un pourcentage de baisse donné (pct). # On cherche le palier le plus profond atteint (dernier franchi). # """ # for i in reversed(range(len(self.paliers))): # seuil, _ = self.paliers[i] # #print(f"pct={pct} seuil={seuil}") # if pct <= seuil: # # print(pct) # return i # return None # Aucun palier atteint # def poly_regression_predictions(self, series: pd.Series, window: int = 20, degree: int = 2, n_future: int = 3) -> pd.DataFrame: # """ # Renvoie une DataFrame avec `n_future` colonnes contenant les extrapolations des n prochains points # selon une régression polynomiale ajustée sur les `window` dernières valeurs. # """ # result = pd.DataFrame(index=series.index) # x = np.arange(window) # # for future_step in range(1, n_future + 1): # result[f'poly_pred_t+{future_step}'] = np.nan # # for i in range(window - 1, len(series)): # y = series.iloc[i - window + 1 : i + 1].values # # if np.any(pd.isna(y)): # continue # # coeffs = np.polyfit(x, y, degree) # poly = np.poly1d(coeffs) # # for future_step in range(1, n_future + 1): # future_x = window - 1 + future_step # Extrapolation point # result.loc[series.index[i], f'poly_pred_t+{future_step}'] = poly(future_x) # # return result def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]): """ Calcule une régression polynomiale sur les `window` dernières valeurs de la série, puis prédit les `n_future` prochaines valeurs. :param series: Série pandas (ex: dataframe['close']) :param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme :param degree: Degré du polynôme (ex: 2 pour quadratique) :param n_future: Nombre de valeurs futures à prédire :return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures """ if len(series) < window: raise ValueError("La série est trop courte pour la fenêtre spécifiée.") recent_y = series.iloc[-window:].values x = np.arange(window) coeffs = np.polyfit(x, recent_y, degree) poly = np.poly1d(coeffs) x_future = np.arange(window, window + len(steps)) y_future = poly(x_future) # Affichage de la fonction # print("Fonction polynomiale trouvée :") # print(poly) current = series.iloc[-1] count = 0 for future_step in steps: # range(1, n_future + 1) future_x = window - 1 + future_step prediction = poly(future_x) # series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction # ➕ Afficher les prédictions # print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}") if prediction > 0: # current: count += 1 return poly, x_future, y_future, count # def calculateStats2(self, df, index, target): # # Nombre de tranches (modifiable) # n_bins_indice = 11 # n_bins_valeur = 11 # # # Tranches dynamiques # # df['indice_tranche'] = pd.qcut(df[f"{index}"], q=n_bins_indice, duplicates='drop') # # df['valeur_tranche'] = pd.qcut(df[f"{target}"], q=n_bins_valeur, duplicates='drop') # # df[f"{index}_bin"], bins_1h = pd.qcut(df[f"{index}"], q=n_bins_indice, labels=self.labels, retbins=True, # duplicates='drop') # df[f"{target}_bin"], bins_1d = pd.qcut(df[f"{target}"], q=n_bins_valeur, labels=self.labels, retbins=True, # duplicates='drop') # # Affichage formaté pour code Python # print(f"Bornes des quantiles pour {index} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]") # print(f"Bornes des quantiles pour {target} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]") # # # Tableau croisé (compte) # tableau = pd.crosstab(df[f"{index}_bin"], df[f"{target}_bin"]) # # # Facultatif : en pourcentages # tableau_pct = tableau.div(tableau.sum(axis=1), axis=0) * 100 # # # Affichage # print("Répartition brute :") # print(tableau) # print("\nRépartition en % par ligne :") # print(tableau_pct.round(2)) def calculateStats(self, df, index, target): # Nombre de tranches (modifiable) n_bins_indice = 11 n_bins_valeur = 11 # Créer les tranches dynamiques df['indice_tranche'] = pd.qcut(df[index], q=n_bins_indice, duplicates='drop') df['valeur_tranche'] = pd.qcut(df[target], q=n_bins_valeur, duplicates='drop') # Créer un tableau croisé avec la moyenne des valeurs pivot_mean = df.pivot_table( index='indice_tranche', columns='valeur_tranche', values=target, # <-- c'est la colonne qu'on agrège aggfunc='mean' # <-- on calcule la moyenne ) # Résultat # print("Moyenne des valeurs par double-tranche :") # print(pivot_mean.round(2)) def should_enter_trade(self, pair: str, last_candle, current_time) -> bool: limit = 3 # return last_candle['slope_norm_1d'] < last_candle['slope_norm_1h'] if self.pairs[pair]['stop'] and last_candle['max_rsi_12_1h'] <= 60 and last_candle['trend_class_1h'] == -1: dispo = round(self.wallets.get_available_stake_amount()) self.pairs[pair]['stop'] = False self.log_trade( last_candle=last_candle, date=current_time, action="🟢RESTART", dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=0, buys=self.pairs[pair]['count_of_buys'], stake=0 ) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # if not pair.startswith('BTC'): dispo = round(self.wallets.get_available_stake_amount()) # if self.pairs[pair]['stop'] \ # and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] >= self.indic_deriv1_1d_p_start.value \ # and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] >= self.indic_deriv2_1d_p_start.value: # self.pairs[pair]['stop'] = False # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢RESTART", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=0, # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # else: # if self.pairs[pair]['stop'] == False \ # and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] <= self.indic_deriv1_1d_p_stop.value \ # and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] <= self.indic_deriv2_1d_p_stop.value: # self.pairs[pair]['stop'] = True # # if self.pairs[pair]['current_profit'] > 0: # # self.pairs[pair]['force_sell'] = True # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🔴STOP", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=self.pairs[pair]['current_profit'], # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # return False # if self.pairs[pair]['stop']: # return False return True # Filtrer les paires non-BTC non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')] # Compter les positions actives sur les paires non-BTC max_nb_trades = 0 total_non_btc = 0 max_pair = '' limit_amount = 250 max_amount = 0 for p in non_btc_pairs: max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys']) max_amount = max(max_amount, self.pairs[p]['total_amount']) for p in non_btc_pairs: if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit): # if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount): max_pair = p total_non_btc += self.pairs[p]['count_of_buys'] pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle) val = self.getProbaHausseSma5d(last_candle) if val < 15: return False # if count_decrease == len(non_btc_pairs): # self.should_enter_trade_count += 1 # char="." # print(f"should_enter_trade canceled all pairs decreased {'':{char}>{self.should_enter_trade_count}}") # return False # if (last_candle['mid_smooth_1h_deriv1'] < -0.0 and last_candle['sma24_deriv1_1h'] < -0.0): # return False # if (last_candle['sma5_deriv1_1d'] < -0.1 and last_candle['sma5_deriv2_1d'] < -0) \ # or last_candle['sma5_deriv2_1d'] < -0.2: # return False if last_candle['mid_smooth_1h_deriv1'] < -0.02: # and last_candle['mid_smooth_1h_deriv2'] > 0): return False # if self.pairs[pair]['count_of_buys'] >= 3: # if (last_candle['sma24_deriv1_1d'] < self.sma24_deriv1_1d_protection.value # and last_candle['sma5_deriv1_1d'] < self.sma5_deriv1_1d_protection.value \ # and last_candle['sma5_deriv2_1d'] < -0.05): # # or (last_candle['sma5_deriv1_1d'] < -0.1 and last_candle['sma24_deriv1_1h'] < -0.1): # self.pairs[pair]['stop'] = True # return False self.should_enter_trade_count = 0 # if max_pair != pair and self.pairs[pair]['total_amount'] > 300: # return False if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit): trade = self.pairs[max_pair]['current_trade'] current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) current_time_utc = current_time.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pct_max_max = self.getPctFirstBuy(max_pair, last_candle) # print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}") return max_pair == pair or pct_max < - 0.25 or ( pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30) else: return True @staticmethod def check_derivatives_vectorized(dataframe, deriv_pairs, thresholds): """ Retourne True si toutes les dérivées respectent leur seuil. """ mask = pd.Series(True, index=dataframe.index) for d1_col, d2_col in deriv_pairs: d1_thresh = thresholds.get(d1_col, 0) d2_thresh = thresholds.get(d2_col, 0) mask &= (dataframe[d1_col] >= d1_thresh) & (dataframe[d2_col] >= d2_thresh) return mask # ---------------------------------------------------------------------------------------------- # fallback defaults (used when no JSON exists) PARAMS_DIR = 'params' DEFAULT_PARAMS = { "rsi_buy": 30, "rsi_sell": 70, "ema_period": 21, "sma_short": 20, "sma_long": 100, "atr_period": 14, "atr_multiplier": 1.5, "stake_amount": None, # use exchange default "stoploss": -0.10, "minimal_roi": {"0": 0.10} } def __init__(self, config: dict) -> None: super().__init__(config) # self.parameters = self.load_params_tree("user_data/strategies/params/") def setTrends(self, dataframe: DataFrame): SMOOTH_WIN=10 df = dataframe.copy() # # --- charger les données --- # df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') # --- calcul SMA14 --- # df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14) # --- pente brute --- df['slope'] = df['sma12'].diff() # --- lissage EMA --- df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean() # df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3) # --- normalisation relative --- df['slope_norm'] = 10000 * df['slope_smooth'] / df['close'] # df['slope_norm'].fillna(0, inplace=True) df['slope_norm'] = df['slope_norm'].fillna(0) # --- classification dynamique via quantiles --- q = df['slope_norm'].quantile([0.125, 0.375, 0.625, 0.875]).values q1, q2, q3, q4 = q def classify_expanding(series): trend_class = [] for i in range(len(series)): past_values = series[:i + 1] # uniquement le passé q = past_values.quantile([0.125, 0.375, 0.625, 0.875]).values q1, q2, q3, q4 = q v = series.iloc[i] if v <= q1: trend_class.append(-2) elif v <= q2: trend_class.append(-1) elif v <= q3: trend_class.append(0) elif v <= q4: trend_class.append(1) else: trend_class.append(2) return trend_class dataframe['slope_norm'] = df['slope_norm'] # dataframe['trend_class'] = df['slope_norm'].apply(classify) dataframe['trend_class'] = None # Rolling sur la fenêtre passée dataframe['trend_class'] = classify_expanding(dataframe['slope_norm']) # # -------------------------- Trend detection (M2) -------------------------- # def getTrend(self, dataframe: DataFrame) -> str: # """ # M2: SMA50 / SMA200 golden/death cross # - bull: sma50 > sma200 # - bear: sma50 < sma200 # - range: sma50 ~= sma200 (within a small pct) # # Uses only past data (no future lookahead). # """ # if dataframe is None or len(dataframe) < max(self.DEFAULT_PARAMS['sma_short'], self.DEFAULT_PARAMS['sma_long']) + 2: # return 'RANGE' # # sma_short = dataframe['close'].rolling(window=self.DEFAULT_PARAMS['sma_short']).mean() # sma_long = dataframe['close'].rolling(window=self.DEFAULT_PARAMS['sma_long']).mean() # # cur_short = sma_short.iloc[-1] # cur_long = sma_long.iloc[-1] # # # small relative threshold to avoid constant flips # if cur_long == 0 or cur_short == 0: # return 'RANGE' # # rel = abs(cur_short - cur_long) / cur_long # threshold = 0.01 # 1% by default; tweak as needed # # if rel <= threshold: # return 'RANGE' # if cur_short > cur_long: # return 'BULL' # return 'BEAR' # # -------------------------- Parameter loading -------------------------- # def loadParamsFor(self, pair: str, trend: str) -> dict: # """Load JSON from params//.json with fallback to DEFAULT_PARAMS.""" # pair_safe = pair.replace('/', '-') # folder name convention: BTC-USDT # # cache key # cache_key = f"{pair_safe}:{trend}" # if cache_key in self._params_cache: # return self._params_cache[cache_key] # # path = os.path.join(self.PARAMS_DIR, pair_safe, f"{trend}.json") # if os.path.isfile(path): # try: # with open(path, 'r') as f: # params = json.load(f) # # merge with defaults so missing keys won't break # merged = {**self.DEFAULT_PARAMS, **params} # self._params_cache[cache_key] = merged # logger.info(f"Loaded params for {pair} {trend} from {path}") # return merged # except Exception as e: # logger.exception(f"Failed to load params {path}: {e}") # # # fallback # logger.info(f"Using DEFAULT_PARAMS for {pair} {trend}") # self._params_cache[cache_key] = dict(self.DEFAULT_PARAMS) # return self._params_cache[cache_key] def load_params_tree(self, base_path="user_data/strategies/params/"): base = Path(base_path) params_tree = {} if not base.exists(): raise FileNotFoundError(f"Base path '{base_path}' not found.") for pair_dir in base.iterdir(): if not pair_dir.is_dir(): continue pair = self.getShortName(pair_dir.name) # ex : BTC-USDT params_tree.setdefault(pair, {}) for trend_dir in pair_dir.iterdir(): if not trend_dir.is_dir(): continue trend = trend_dir.name # ex : bull / bear / range params_tree[pair].setdefault(trend, []) for file in trend_dir.glob("*-hyperopt_result.json"): filename = file.name # Extraire START et END try: prefix = filename.replace("-hyperopt_result.json", "") start, end = prefix.split("-", 1) # split en 2 except Exception: start = None end = None # Lire le JSON try: with open(file, "r") as f: content = json.load(f) except Exception as err: content = {"error": str(err)} params_tree[pair][trend].append({ "start": start, "end": end, "file": str(file), "content": content, }) for pair, trends in params_tree.items(): for trend, entries in trends.items(): if entries: # indic_5m = self.getParamValue(pair, trend, 'buy', 'indic_5m') # indic_deriv1_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv1_5m') # indic_deriv2_5m = self.getParamValue(pair, trend, 'buy', 'indic_deriv2_5m') # # indic_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_5m_sell') # indic_deriv1_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv1_5m_sell') # indic_deriv2_5m_sell = self.getParamValue(pair, trend, 'sell', 'indic_deriv2_5m_sell') print(f"{pair} -> {trend}") # {indic_5m} {indic_deriv1_5m} {indic_deriv2_5m} {indic_5m_sell} {indic_deriv1_5m_sell} {indic_deriv2_5m_sell}") # for entry in entries: # print(entry) return params_tree def getParamValue(self, pair, trend, space, param): pair = self.getShortName(pair) return self.parameters[pair][trend][0]['content']['params'][space][param] def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7): """ Sélectionne les features les plus corrélées avec target, tout en supprimant celles trop corrélées entre elles. """ # 1️⃣ Calcul des corrélations absolues avec la cible corr = df.corr(numeric_only=True) corr_target = corr[target].abs().sort_values(ascending=False) # 2️⃣ Prend les N features les plus corrélées avec la cible (hors target) features = corr_target.drop(target).head(top_n).index.tolist() # 3️⃣ Évite les features trop corrélées entre elles selected = [] for feat in features: too_correlated = False for sel in selected: if abs(corr.loc[feat, sel]) > corr_threshold: too_correlated = True break if not too_correlated: selected.append(feat) # 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation selected_corr = pd.DataFrame({ "feature": selected, "corr_with_target": [corr.loc[f, target] for f in selected] }).sort_values(by="corr_with_target", key=np.abs, ascending=False) return selected_corr def graphFonctionApprise(self, X_test, y_test, y_pred): # Exemple : trier les valeurs de X_test et les prédictions x_sorted = np.argsort(X_test.iloc[:, 0]) x = X_test.iloc[:, 0].iloc[x_sorted] y_true = y_test.iloc[x_sorted] y_pred = y_pred[x_sorted] plt.figure(figsize=(12, 6)) plt.plot(x, y_true, label="Réel", color="blue", alpha=0.7) plt.plot(x, y_pred, label="Prédit (LGBM)", color="red", alpha=0.7) plt.title("Fonction apprise par LGBMRegressor") plt.xlabel("Feature principale") plt.ylabel("Valeur prédite") plt.legend() plt.grid(True) out_path = "/home/souti/freqtrade/user_data/plots/lgbm_function.png" plt.savefig(out_path, bbox_inches="tight") plt.close() print(f"Graphique sauvegardé : {out_path}") def graphFonctionAppriseFeature(self, X_test, y_test, y_pred): plt.figure(figsize=(14, 8)) colors = sns.color_palette("coolwarm", n_colors=X_test.shape[1]) for i, col in enumerate(X_test.columns): plt.plot(X_test[col], y_pred, '.', color=colors[i], alpha=0.4, label=col) plt.title("Fonction apprise par LGBMRegressor (par feature)") plt.xlabel("Valeur feature") plt.ylabel("Valeur prédite") plt.legend(loc="best") plt.grid(True) out_path = "/home/souti/freqtrade/user_data/plots/lgbm_features.png" plt.savefig(out_path, bbox_inches="tight") plt.close() print(f"Graphique sauvegardé : {out_path}") def optuna(self, X_train, X_test, y_train, y_test): # Suppose que X_train, y_train sont déjà définis # ou sinon : # X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42) print("Description") print(X_train.describe().T.sort_values("std")) def objective(trial): params = { 'objective': 'regression', 'metric': 'rmse', 'n_estimators': trial.suggest_int('n_estimators', 100, 1000), 'learning_rate': trial.suggest_float('learning_rate', 0.005, 0.2, log=True), 'max_depth': trial.suggest_int('max_depth', 3, 15), 'num_leaves': trial.suggest_int('num_leaves', 20, 300), 'subsample': trial.suggest_float('subsample', 0.5, 1.0), 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0), 'reg_alpha': trial.suggest_float('reg_alpha', 0.0, 10.0), 'reg_lambda': trial.suggest_float('reg_lambda', 0.0, 10.0), 'random_state': 42, } model = LGBMRegressor(**params) model.fit(X_train, y_train) # On peut aussi valider sur un split interne preds = model.predict(X_test) rmse = np.sqrt(mean_squared_error(y_test, preds)) return rmse # Crée une étude Optuna study = optuna.create_study(direction="minimize") # on veut minimiser l'erreur study.optimize(objective, n_trials=50, show_progress_bar=True) # 🔹 Afficher les meilleurs résultats print("✅ Meilleurs hyperparamètres trouvés :") print(study.best_params) print(f"Meilleur RMSE : {study.best_value:.4f}") # 🔹 Sauvegarder les résultats optuna_path = "/home/souti/freqtrade/user_data/plots/optuna_lgbm_results.txt" with open(optuna_path, "w") as f: f.write(f"Best params:\n{study.best_params}\n") f.write(f"Best RMSE: {study.best_value:.4f}\n") print(f"Résultats sauvegardés dans : {optuna_path}") # 🔹 Créer le modèle final avec les meilleurs paramètres print("🚀 Entraînement du modèle LightGBM...") # -- Appliquer le filtrage -- X_train_filtered = self.filter_features(X_train, y_train) best_model = LGBMRegressor(**study.best_params) best_model.fit(X_train_filtered, y_train) # fig1 = vis.plot_optimization_history(study) # fig1.write_image("/home/souti/freqtrade/user_data/plots/optuna_history.png") # # fig2 = vis.plot_param_importances(study) # fig2.write_image("/home/souti/freqtrade/user_data/plots/optuna_importance.png") return best_model, X_train_filtered def filter_features(self, X: pd.DataFrame, y: pd.Series, corr_threshold: float = 0.95): """Filtre les colonnes peu utiles ou redondantes""" print("🔍 Filtrage automatique des features...") # 1️⃣ Supprimer les colonnes constantes vt = VarianceThreshold(threshold=1e-5) X_var = pd.DataFrame(vt.fit_transform(X), columns=X.columns[vt.get_support()]) print(f" - {len(X.columns) - X_var.shape[1]} colonnes supprimées (variance faible)") # 2️⃣ Supprimer les colonnes très corrélées entre elles corr = X_var.corr().abs() upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool)) drop_cols = [column for column in upper.columns if any(upper[column] > corr_threshold)] X_corr = X_var.drop(columns=drop_cols, errors='ignore') print(f" - {len(drop_cols)} colonnes supprimées (corrélation > {corr_threshold})") # 3️⃣ Facultatif : supprimer les colonnes entièrement NaN X_clean = X_corr.dropna(axis=1, how='all') print(f"✅ {X_clean.shape[1]} colonnes conservées après filtrage.\n") return X_clean