import inspect import os import freqtrade.vendor.qtpylib.indicators as qtpylib # Machine Learning import joblib import matplotlib.pyplot as plt import numpy as np import optuna import pandas as pd import seaborn as sns import shap import ta import talib.abstract as talib import tensorflow as tf from catboost import CatBoostClassifier from optuna.visualization import plot_optimization_history from optuna.visualization import plot_parallel_coordinate from optuna.visualization import plot_param_importances from optuna.visualization import plot_slice from sklearn.calibration import CalibratedClassifierCV from sklearn.feature_selection import SelectFromModel from sklearn.feature_selection import VarianceThreshold from sklearn.inspection import PartialDependenceDisplay from sklearn.inspection import permutation_importance from sklearn.metrics import precision_recall_curve, ConfusionMatrixDisplay from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_curve, precision_score, recall_score ) from sklearn.metrics import ( f1_score ) from sklearn.metrics import roc_auc_score from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.tree import export_text from tensorflow.keras.callbacks import EarlyStopping from tensorflow.keras.layers import Dense, Dropout from tensorflow.keras.models import Sequential from xgboost import XGBClassifier class Crash: timeframe = '1h' dataframe = {} train_model = None short_pair = "BTC" model_indicators = [] path = f"user_data/strategies/plots/{short_pair}/crash/" def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'): dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean() dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", ema_period=window) return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" # d1s_col = f"{name}{suffixe}_deriv1_smooth" # d2s_col = f"{name}{suffixe}_deriv2_smooth" tendency_col = f"{name}{suffixe}_state" factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[ f"{name}{suffixe}"].shift(1)) \ & (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"]) # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[ f"{name}{suffixe}"] # dérivée relative simple dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) # lissage EMA dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean() # dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median() dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1) dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean() # epsilon adaptatif via rolling percentile p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95) p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95) eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef # fallback global eps global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1) eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2) # if verbose and self.dp.runmode.value in ('backtest'): # stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T # stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0) # print(f"---- Derivatives stats {timeframe}----") # print(stats) # print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}") # print("---------------------------") # mapping tendency def tag_by_derivatives(row): idx = int(row.name) d1v = float(row[d1_col]) d2v = float(row[d2_col]) eps1 = float(eps_d1_series.iloc[idx]) eps2 = float(eps_d2_series.iloc[idx]) # # mapping état → codes 3 lettres explicites # # | Ancien état | Nouveau code 3 lettres | Interprétation | # # | ----------- | ---------------------- | --------------------- | # # | 4 | HAU | Hausse Accélérée | # # | 3 | HSR | Hausse Ralentissement | # # | 2 | HST | Hausse Stable | # # | 1 | DHB | Départ Hausse | # # | 0 | PAL | Palier / neutre | # # | -1 | DBD | Départ Baisse | # # | -2 | BSR | Baisse Ralentissement | # # | -3 | BST | Baisse Stable | # # | -4 | BAS | Baisse Accélérée | # Palier strict if abs(d1v) <= eps1 and abs(d2v) <= eps2: return 0 # Départ si d1 ~ 0 mais d2 signale direction if abs(d1v) <= eps1: return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0 # Hausse if d1v > eps1: return 4 if d2v > eps2 else 3 # Baisse if d1v < -eps1: return -4 if d2v < -eps2 else -2 return 0 dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1) # if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'): # print("##################") # print(f"# STAT {timeframe} {name}{suffixe}") # print("##################") # self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2") return dataframe def calculateIndicators(self, df): # heikinashi = qtpylib.heikinashi(df) # df['haopen'] = heikinashi['open'] # df['haclose'] = heikinashi['close'] # df['hapercent'] = (df['haclose'] - df['haopen']) / df['haclose'] df['mid'] = df['open'] + (df['close'] - df['open']) / 2 df['sma5'] = df['mid'].ewm(span=5, adjust=False).mean() # df["mid"].rolling(window=5).mean() df['sma5_deriv1'] = 1000 * (df['sma5'] - df['sma5'].shift(1)) / df['sma5'].shift(1) df['sma12'] = df['mid'].ewm(span=12, adjust=False).mean() df['sma12_deriv1'] = 1000 * (df['sma12'] - df['sma12'].shift(1)) / df[ 'sma12'].shift(1) df['sma24'] = df['mid'].ewm(span=24, adjust=False).mean() df['sma24_deriv1'] = 1000 * (df['sma24'] - df['sma24'].shift(1)) / df['sma24'].shift(1) df['sma60'] = df['mid'].ewm(span=60, adjust=False).mean() df['sma60_deriv1'] = 1000 * (df['sma60'] - df['sma60'].shift(1)) / df['sma60'].shift(1) # df[f"sma5_inv"] = (df[f"sma5"].shift(2) >= df[f"sma5"].shift(1)) \ # & (df[f"sma5"].shift(1) <= df[f"sma5"]) df["sma5_sqrt"] = ( np.sqrt(np.abs(df["sma5"] - df["sma5"].shift(1))) + np.sqrt(np.abs(df["sma5"].shift(3) - df["sma5"].shift(1))) ) df["sma5_inv"] = ( (df["sma5"].shift(2) >= df["sma5"].shift(1)) & (df["sma5"].shift(1) <= df["sma5"]) & (df["sma5_sqrt"] > 5) ) df["sma12_sqrt"] = ( np.sqrt(np.abs(df["sma12"] - df["sma12"].shift(1))) + np.sqrt(np.abs(df["sma12"].shift(3) - df["sma12"].shift(1))) ) df["sma12_inv"] = ( (df["sma12"].shift(2) >= df["sma12"].shift(1)) & (df["sma12"].shift(1) <= df["sma12"]) & (df["sma12_sqrt"] > 5) ) df["percent"] = df['mid'].pct_change() df["percent3"] = df['mid'].pct_change(3).rolling(3).mean() df["percent12"] = df['mid'].pct_change(12).rolling(12).mean() df["percent24"] = df['mid'].pct_change(24).rolling(24).mean() df['rsi'] = talib.RSI(df['mid'], timeperiod=14) self.calculeDerivees(df, 'rsi', ema_period=12) df['max_rsi_12'] = talib.MAX(df['rsi'], timeperiod=12) df['max_rsi_24'] = talib.MAX(df['rsi'], timeperiod=24) df['max5'] = talib.MAX(df['mid'], timeperiod=5) df['min180'] = talib.MIN(df['mid'], timeperiod=180) df['max180'] = talib.MAX(df['mid'], timeperiod=180) df['pct180'] = ((df["mid"] - df['min180']) / (df['max180'] - df['min180'])) # df = self.rsi_trend_probability(df, short=60, long=360) ########################################################### # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(df), window=20, stds=2) df['bb_lowerband'] = bollinger['lower'] df['bb_middleband'] = bollinger['mid'] df['bb_upperband'] = bollinger['upper'] df["bb_percent"] = ( (df["close"] - df["bb_lowerband"]) / (df["bb_upperband"] - df["bb_lowerband"]) ) df["bb_width"] = (df["bb_upperband"] - df["bb_lowerband"]) / df["sma24"] # Calcul MACD macd, macdsignal, macdhist = talib.MACD( df['close'], fastperiod=12, slowperiod=26, signalperiod=9 ) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le df df['macd'] = macd df['macdsignal'] = macdsignal df['macdhist'] = macdhist # ------------------------------------------------------------------------------------ # rolling SMA indicators (used for trend detection too) # s_short = self.DEFAULT_PARAMS['sma_short'] # s_long = self.DEFAULT_PARAMS['sma_long'] # # df[f'sma_{s_short}'] = df['close'].rolling(window=s_short).mean() # df[f'sma_{s_long}'] = df['close'].rolling(window=s_long).mean() # --- pente brute --- df['slope'] = df['sma24'].diff() # --- lissage EMA --- df['slope_smooth'] = df['slope'].ewm(span=10, adjust=False).mean() ########################### # df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume # Assure-toi qu'il est trié par date croissante timeframe = self.timeframe # --- Volatilité normalisée --- df['atr'] = ta.volatility.AverageTrueRange( high=df['high'], low=df['low'], close=df['close'], window=14 ).average_true_range() df['atr_norm'] = df['atr'] / df['close'] # --- Force de tendance --- df['adx'] = ta.trend.ADXIndicator( high=df['high'], low=df['low'], close=df['close'], window=14 ).adx() # --- Volume directionnel (On Balance Volume) --- df['obv'] = ta.volume.OnBalanceVolumeIndicator( close=df['close'], volume=df['volume'] ).on_balance_volume() self.calculeDerivees(df, 'obv', ema_period=1) df['obv12'] = ta.volume.OnBalanceVolumeIndicator( close=df['sma12'], volume=df['volume'].rolling(12).sum() ).on_balance_volume() df['obv24'] = ta.volume.OnBalanceVolumeIndicator( close=df['sma24'], volume=df['volume'].rolling(24).sum() ).on_balance_volume() # self.calculeDerivees(df, 'obv5', ema_period=5) # --- Volatilité récente (écart-type des rendements) --- df['vol_24'] = df['percent'].rolling(24).std() # Compter les baisses / hausses consécutives # self.calculateDownAndUp(df, limit=0.0001) # df : ton df OHLCV + indicateurs existants # Assurez-vous que les colonnes suivantes existent : # 'max_rsi_12', 'roc_24', 'bb_percent_1h' # --- Filtrage des NaN initiaux --- # df = df.dropna() df['rsi_slope'] = df['rsi'].diff(3) / 3 # vitesse moyenne du RSI df['adx_change'] = df['adx'] - df['adx'].shift(12) # évolution de la tendance df['volatility_ratio'] = df['atr_norm'] / df['bb_width'] df["rsi_diff"] = df["rsi"] - df["rsi"].shift(3) df["slope_ratio"] = df["sma5_deriv1"] / (df["sma60_deriv1"] + 1e-9) df["divergence"] = (df["rsi_deriv1"] * df["sma5_deriv1"]) < 0 # features df['returns'] = df['close'].pct_change() df['atr'] = (df['high'] - df['low']).rolling(14).mean() df['slope'] = df['close'].rolling(20).mean().diff() df['drawdown'] = (df['close'] - df['close'].rolling(48).max()) / df['close'].rolling(48).max() df['atr_pct'] = df['atr'] / df['close'] df['vol_z'] = (df['volume'] - df['volume'].rolling(48).mean()) / df['volume'].rolling(48).std() df['rsi_slope'] = df['rsi'].diff(3) # VOLATILITÉ df['atr_norm'] = df['atr'] / df['close'] # DRAWDOWN (critique) df['rolling_max'] = df['close'].rolling(48).max() df['drawdown'] = (df['close'] - df['rolling_max']) / df['rolling_max'] df['dd_score'] = np.clip(-df['drawdown'] / 0.10, 0, 1) # TENDANCE (slope) df['MA7'] = df['close'].rolling(7).mean() df['MA14'] = df['close'].rolling(14).mean() df['slope'] = df['MA7'] - df['MA14'] df['slope_score'] = np.clip(1 - (df['slope'] / df['close']), 0, 1) # NÉGATIVE STREAK df['neg_streak'] = df['close'].pct_change().apply(lambda x: min(x, 0)).rolling(24).sum() df['neg_score'] = np.clip(-df['neg_streak'] / 0.05, 0, 1) # COMPOSANTS COURT TERME df['pct_change_3'] = df['close'].pct_change(3) df['pct_change_3_smooth'] = df['pct_change_3'].rolling(6).mean() df['crash_score'] = np.clip(1 + (df['pct_change_3_smooth'] / 0.05), 0, 1) df['speed'] = df['close'].diff().rolling(6).mean() df['accel'] = df['speed'].diff().rolling(6).mean() df['STD20'] = df['close'].rolling(20).std() df['accel_score'] = np.clip(1 + (df['accel'] / (df['STD20'] + 1e-9)), 0, 1) # INDEX FINAL df['crash_raw'] = ( 0.35 * df['dd_score'] + # le plus important pour crash lent 0.25 * df['neg_score'] + 0.20 * df['slope_score'] + 0.10 * df['crash_score'] + 0.10 * df['accel_score'] ) # LISSAGE SIMPLE df['crash_risk_index'] = df['crash_raw'].ewm(span=24).mean() return df def feature_auc_scores(self, X, y): aucs = {} for col in X.columns: try: aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0)) except Exception: aucs[col] = np.nan return pd.Series(aucs).sort_values(ascending=False) def listUsableColumns(self, dataframe): # Étape 1 : sélectionner numériques numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # Étape 2 : enlever constantes usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 and not c.endswith("_state") and not c.endswith("_1d") # and not c.endswith("_1h") # and not c.startswith("open") and not c.startswith("close") # and not c.startswith("low") and not c.startswith("high") # and not c.startswith("haopen") and not c.startswith("haclose") # and not c.startswith("bb_lower") and not c.startswith("bb_upper") # and not c.startswith("bb_middle") and not c.endswith("_count") and not c.endswith("_class") and not c.endswith("_price") and not c.startswith('stop_buying') and not c.startswith('target') and not c.startswith('lvl') and not c.startswith('confidence_index') ] # Étape 3 : remplacer inf et NaN par 0 dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) print("Colonnes utilisables pour le modèle :") print(usable_cols) # self.model_indicators = usable_cols return usable_cols def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7): """ Sélectionne les features les plus corrélées avec target, tout en supprimant celles trop corrélées entre elles. """ # 1️⃣ Calcul des corrélations absolues avec la cible corr = df.corr(numeric_only=True) corr_target = corr[target].abs().sort_values(ascending=False) # 2️⃣ Prend les N features les plus corrélées avec la cible (hors target) features = corr_target.drop(target).head(top_n).index.tolist() # 3️⃣ Évite les features trop corrélées entre elles selected = [] for feat in features: too_correlated = False for sel in selected: if abs(corr.loc[feat, sel]) > corr_threshold: too_correlated = True break if not too_correlated: selected.append(feat) # 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation selected_corr = pd.DataFrame({ "feature": selected, "corr_with_target": [corr.loc[f, target] for f in selected] }).sort_values(by="corr_with_target", key=np.abs, ascending=False) return selected_corr def drawPredictions(self, df, indicators, y_proba, threshold=0.5): """ Trace simultanément plusieurs indicateurs et la prédiction du modèle. Parameters ---------- df : pd.DataFrame Dataframe avec les colonnes des indicateurs et la target. indicators : list[str] Liste des colonnes du dataframe à tracer. y_proba : np.array Probabilités prédites par le modèle (valeurs continues entre 0 et 1) threshold : float Seuil pour convertir probabilité en signal binaire output_file : str Fichier sur lequel sauvegarder le graphique """ plt.figure(figsize=(18, 6)) # Tracer les indicateurs for col in indicators: plt.plot(df.index, df[col], label=col, alpha=0.7) # Tracer la prédiction du modèle (probabilité) plt.plot(df.index, y_proba, label="Prediction prob.", color="black", linestyle="--") # Optionnel : signal binaire (1 si prob > threshold) y_signal = (y_proba > threshold).astype(int) plt.scatter(df.index, y_signal, color='red', marker='o', label='Signal > threshold', s=20) plt.title("Indicateurs + prédiction MLP") plt.xlabel("Date") plt.ylabel("Valeur / Probabilité") plt.legend() plt.grid(True) plt.tight_layout() plt.savefig(f"{self.path}/indicators_vs_prediction.png") plt.close() def drawSequentialGraphs(self, model, history, X_train_scaled, X_valid_scaled, y_train, y_valid, thresholds=None, best_threshold=None ): """ Génère et sauvegarde tous les graphes utiles pour un MLP (Sequential). Parameters ---------- model : keras Sequential history : History (retour de model.fit) X_train_scaled, X_valid_scaled : np.array y_train, y_valid : np.array thresholds : list[float] | None best_threshold : float | None """ feature_names = self.listUsableColumns(self.dataframe) # ========================= # 1️⃣ Courbes de loss # ========================= plt.figure() plt.plot(history.history['loss'], label='train') plt.plot(history.history['val_loss'], label='val') plt.legend() plt.title("MLP loss") plt.savefig(f"{self.path}/loss.png") plt.close() # ========================= # 2️⃣ Probabilités # ========================= y_proba = model.predict(X_valid_scaled).ravel() plt.figure() plt.hist(y_proba[y_valid == 0], bins=50, alpha=0.6, label="No crash") plt.hist(y_proba[y_valid == 1], bins=50, alpha=0.6, label="Crash") plt.legend() plt.title("Predicted probability distribution") plt.savefig(f"{self.path}/proba_distribution.png") plt.close() # ========================= # 3️⃣ Precision–Recall # ========================= precision, recall, _ = precision_recall_curve(y_valid, y_proba) plt.figure() plt.plot(recall, precision) plt.xlabel("Recall") plt.ylabel("Precision") plt.title("Precision-Recall curve") plt.savefig(f"{self.path}/precision_recall.png") plt.close() # ========================= # 4️⃣ F1 vs threshold # ========================= if thresholds is None: thresholds = np.linspace(0.05, 0.95, 60) f1s = [ f1_score(y_valid, (y_proba > t).astype(int)) for t in thresholds ] plt.figure() plt.plot(thresholds, f1s) plt.xlabel("Threshold") plt.ylabel("F1 score") plt.title("F1 vs threshold") plt.savefig(f"{self.path}/f1_threshold.png") plt.close() # Choix auto du seuil si absent if best_threshold is None: best_threshold = thresholds[int(np.argmax(f1s))] # ========================= # 5️⃣ Matrice de confusion # ========================= ConfusionMatrixDisplay.from_predictions( y_valid, (y_proba > best_threshold).astype(int) ) plt.title(f"Confusion matrix (threshold={best_threshold:.2f})") plt.savefig(f"{self.path}/confusion_matrix.png") plt.close() # # ========================= # # 6️⃣ Permutation importance # # ========================= # r = permutation_importance( # model, # X_valid_scaled, # y_valid, # scoring="f1", # n_repeats=8, # n_jobs=-1 # ) # # importances = pd.Series( # r.importances_mean, # index=feature_names # ).sort_values() # # plt.figure(figsize=(7, 4)) # importances.plot(kind="barh") # plt.title("Permutation importance (MLP)") # plt.savefig(f"{self.path}/permutation_importance.png") # plt.close() # Exemple : on choisit 3 indicateurs du dataframe indicators = ['percent12'] # y_proba = model.predict(X_valid_scaled).ravel() self.drawPredictions( df=self.dataframe.iloc[-len(y_proba):], # sélectionner la période correspondant à X_valid indicators=indicators, y_proba=y_proba, threshold=0.5 ) # ========================= # 7️⃣ Sauvegarde seuil # ========================= with open(f"{self.path}/best_threshold.txt", "w") as f: f.write(str(best_threshold)) return { "best_threshold": best_threshold, "best_f1": max(f1s) } def optimize_sequential(self, X_train, X_valid, y_train, y_valid , n_trials=20): def objective(trial): tf.keras.backend.clear_session() # 🔧 Hyperparams n1 = trial.suggest_int("units_1", 32, 128) n2 = trial.suggest_int("units_2", 16, 64) dropout1 = trial.suggest_float("dropout_1", 0.1, 0.5) dropout2 = trial.suggest_float("dropout_2", 0.1, 0.5) lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True) batch_size = trial.suggest_categorical("batch_size", [16, 32, 64]) # 🔒 Scaling (train only) scaler = StandardScaler() X_tr = scaler.fit_transform(X_train) X_val = scaler.transform(X_valid) # 🧠 Model model = Sequential([ Dense(n1, activation='relu', input_shape=(X_tr.shape[1],)), Dropout(dropout1), Dense(n2, activation='relu'), Dropout(dropout2), Dense(1, activation='sigmoid') ]) model.compile( optimizer=tf.keras.optimizers.Adam(lr), loss='binary_crossentropy' ) es = EarlyStopping( monitor='val_loss', patience=10, restore_best_weights=True ) model.fit( X_tr, y_train, validation_data=(X_val, y_valid), epochs=150, batch_size=batch_size, callbacks=[es], verbose=0 ) proba = model.predict(X_val).ravel() # 🔥 Optimisation sur le F1 crash thresholds = [0.3, 0.4, 0.5, 0.6] best_f1 = max( f1_score(y_valid, (proba > t).astype(int)) for t in thresholds ) return best_f1 study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=n_trials) print(study.best_trial) best = study.best_trial.params scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_valid_scaled = scaler.transform(X_valid) model = Sequential([ Dense(best["units_1"], activation='relu', input_shape=(X_train_scaled.shape[1],)), Dropout(best["dropout_1"]), Dense(best["units_2"], activation='relu'), Dropout(best["dropout_2"]), Dense(1, activation='sigmoid') ]) model.compile( optimizer=tf.keras.optimizers.Adam(best["lr"]), loss='binary_crossentropy' ) history = model.fit( X_train_scaled, y_train, validation_data=(X_valid_scaled, y_valid), epochs=150, batch_size=best["batch_size"], callbacks=[EarlyStopping(patience=10, restore_best_weights=True)], verbose=0 ) result = self.drawSequentialGraphs( model=model, history=history, X_train_scaled=X_train_scaled, X_valid_scaled=X_valid_scaled, y_train=y_train, y_valid=y_valid ) return model, study def optimize_xgbclassifier(self, X_train, y_train, X_valid, y_valid, n_trials=20): def objective(trial): # local_model = XGBClassifier( # n_estimators=300, # nombre d'arbres plus raisonnable # learning_rate=0.01, # un peu plus rapide que 0.006, mais stable # max_depth=4, # capture plus de patterns que 3, sans overfitting excessif # subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting # colsample_bytree=0.8, # 80% des features par arbre # gamma=0.01, # gain minimal pour un split → régularisation # reg_alpha=0.01, # L1 régularisation des feuilles # reg_lambda=1, # L2 régularisation des feuilles # n_jobs=-1, # utilise tous les cœurs CPU pour accélérer # random_state=42, # reproductibilité # missing=float('nan'), # valeur manquante reconnue # eval_metric='logloss' # métrique pour classification binaire # ) local_model = XGBClassifier( n_estimators=trial.suggest_int("n_estimators", 300, 500), max_depth=trial.suggest_int("max_depth", 1, 6), learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True), subsample=trial.suggest_float("subsample", 0.6, 1.0), colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0), scale_pos_weight=1, objective="binary:logistic", eval_metric="logloss", n_jobs=-1 ) local_model.fit( X_train, y_train, eval_set=[(X_valid, y_valid)], # early_stopping_rounds=50, verbose=False ) proba = local_model.predict_proba(X_valid)[:, 1] thresholds = np.linspace(0.1, 0.9, 50) best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds) return best_f1 study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=100) # SHAP # Reconstruction du modèle final avec les meilleurs hyperparamètres # Récupération des meilleurs paramètres trouvés best_params = study.best_params best_model = XGBClassifier(**best_params) best_model.fit(X_train, y_train) self.analyseShap(X_train, X_valid) selected_features = self.calibrateModel(best_model, X_train, y_train) self.analyseImportances(selected_features, X_train, X_valid, y_valid) return best_model, study def optimize_catboost(self, X_train, y_train, X_valid, y_valid, n_trials=20): """ Optimise un modèle CatBoost pour la détection de crashs rares. """ # Calcul automatique du poids pour la classe minoritaire scale_pos_weight = len(y_train[y_train == 0]) / max(len(y_train[y_train == 1]), 1) def objective(trial): model = CatBoostClassifier( iterations=trial.suggest_int("iterations", 200, 500), depth=trial.suggest_int("depth", 3, 8), learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3, log=True), l2_leaf_reg=trial.suggest_float("l2_leaf_reg", 1, 10), subsample=trial.suggest_float("subsample", 0.6, 1.0), scale_pos_weight=scale_pos_weight, eval_metric="F1", random_state=42, verbose=0 ) # Entraînement model.fit(X_train, y_train, eval_set=(X_valid, y_valid), early_stopping_rounds=50) # Probabilités pour la classe 1 (crash) proba = model.predict_proba(X_valid)[:, 1] # Recherche du seuil optimal pour maximiser F1 thresholds = np.linspace(0.05, 0.5, 20) # seuil plus bas pour classe rare best_f1 = 0 for t in thresholds: f1 = f1_score(y_valid, (proba > t).astype(int)) if f1 > best_f1: best_f1 = f1 return best_f1 study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=n_trials) print("Meilleurs paramètres :", study.best_params) print("Meilleur F1 :", study.best_value) # Entraînement final avec les meilleurs paramètres best_params = study.best_params best_model = CatBoostClassifier( iterations=best_params["iterations"], depth=best_params["depth"], learning_rate=best_params["learning_rate"], l2_leaf_reg=best_params["l2_leaf_reg"], subsample=best_params["subsample"], scale_pos_weight=scale_pos_weight, eval_metric="F1", random_state=42, verbose=0 ) best_model.fit(X_train, y_train) self.analyseShap(X_train, X_valid) selected_features = self.calibrateModel(best_model, X_train, y_train) self.analyseImportances(selected_features, X_train, X_valid, y_valid) return best_model, study def trainModel(self, dataframe): pair = self.short_pair pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) path = self.path # f"user_data/plots/{pair}/" os.makedirs(path, exist_ok=True) os.system(f"rm -rf {self.path}/*") # # Étape 1 : sélectionner numériques # numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # # # Étape 2 : enlever constantes # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d") # and not c.endswith("_class") and not c.endswith("_price") # and not c.startswith('stop_buying'))] # # # Étape 3 : remplacer inf et NaN par 0 # dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) # # print("Colonnes utilisables pour le modèle :") # print(usable_cols) # # self.model_indicators = usable_cols # df = dataframe[self.model_indicators].copy() # Corrélations des colonnes corr = df.corr(numeric_only=True) print("Corrélation des colonnes") print(corr) # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies # df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int) # df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 100).astype(int) # df['target'] = df['target'].fillna(0).astype(int) # label : crash si -n% dans les p heures self.initTarget(df) self.calculateCorrelation(df) # # Exemple d'utilisation : # selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7) # print("===== 🎯 FEATURES SÉLECTIONNÉES =====") # print(selected_corr) # # # Nettoyage # df = df.dropna() # # X = df[self.model_indicators] # y = df['target'] # ta colonne cible binaire ou numérique # print("===== 🎯 FEATURES SCORES =====") # print(self.feature_auc_scores(X, y)) # 4️⃣ Split train/test X = df[self.model_indicators] y = df['target'] # Séparation temporelle (train = 80 %, valid = 20 %) X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False) # Nettoyage des valeurs invalides selector = VarianceThreshold(threshold=0.0001) selector.fit(X_train) selected = X_train.columns[selector.get_support()] print("Colonnes conservées :", list(selected)) # 5️⃣ Entraînement du modèle # self.train_model = RandomForestClassifier(n_estimators=200, random_state=42) assert len(X_train) == len(y_train) assert len(X_valid) == len(y_valid) self.train_model, study = self.optimize_sequential(X_train, X_valid, y_train, y_valid, n_trials=50) self.analyseStudy(study) y_pred = self.train_model.predict(X_valid) if hasattr(self.train_model, "predict_proba"): y_proba = self.train_model.predict_proba(X_valid)[:, 1] else: y_proba = self.train_model.predict(X_valid).ravel() # print(classification_report(y_valid, y_pred)) # print(confusion_matrix(y_valid, y_pred)) print("\nRapport de classification :\n", classification_report(y_valid, y_pred)) print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred)) # # Importances # importances = pd.DataFrame({ # "feature": self.train_model.feature_name_, # "importance": self.train_model.feature_importances_ # }).sort_values("importance", ascending=False) # print("\n===== 🔍 IMPORTANCE DES FEATURES =====") # print(importances) best_f1 = 0 best_t = 0.5 for t in [0.3, 0.4, 0.5, 0.6, 0.7]: y_pred_thresh = (y_proba > t).astype(int) score = f1_score(y_valid, y_pred_thresh) print(f"Seuil {t:.1f} → F1: {score:.3f}") if score > best_f1: best_f1 = score best_t = t print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}") # 6️⃣ Évaluer la précision (facultatif) preds = self.train_model.predict(X_valid) acc = accuracy_score(y_valid, preds) print(f"Accuracy: {acc:.3f}") # 7️⃣ Sauvegarde du modèle joblib.dump(self.train_model, f"{self.path}/{pair}_rf_model.pkl") print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl") # X = dataframe des features (après shift/rolling/indicators) # y = target binaire ou décimale # model = ton modèle entraîné (RandomForestClassifier ou Regressor) # # --- 1️⃣ Mutual Information (MI) --- # mi_scores = mutual_info_classif(X.fillna(0), y) # mi_series = pd.Series(mi_scores, index=X.columns, name='MI') # # # --- 2️⃣ Permutation Importance (PI) --- # pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1) # pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI') # # # --- 3️⃣ Combinaison dans un seul dataframe --- # importance_df = pd.concat([mi_series, pi_series], axis=1) # importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle # print(importance_df) # # importance_df.plot(kind='bar', figsize=(10, 5)) # plt.title("Mutual Info vs Permutation Importance") # plt.ylabel("Score") # plt.show() self.analyze_model(self.train_model, X_train, X_valid, y_train, y_valid) def analyseImportances(self, selected_features, X_train, X_valid, y_valid): # Feature importance if hasattr(self.train_model, "feature_importances_"): importances = self.train_model.feature_importances_ feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False) # Affichage feat_imp.plot(kind='bar', figsize=(12, 6)) plt.title("Feature importances") # plt.show() plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight') result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10, random_state=42) perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False) perm_imp.plot(kind='bar', figsize=(12, 6)) plt.title("Permutation feature importance") # plt.show() plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight') fig, ax = plt.subplots(figsize=(24, 48)) PartialDependenceDisplay.from_estimator( self.train_model, X_valid, selected_features, kind="average", ax=ax ) fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight") plt.close(fig) def calibrateModel(self, model, X_train, y_train): # 2️⃣ Sélection des features AVANT calibration sfm = SelectFromModel(model, threshold="median", prefit=True) selected_features = X_train.columns[sfm.get_support()] print(selected_features) # 3️⃣ Calibration ensuite (facultative) calibrated = CalibratedClassifierCV(model, method='sigmoid', cv=5) calibrated.fit(X_train[selected_features], y_train) print(calibrated) # # # calibration # model = CalibratedClassifierCV(model, method='sigmoid', cv=5) # # Sélection # sfm = SelectFromModel(model, threshold="median") # sfm.fit(X_train, y_train) # selected_features = X_train.columns[sfm.get_support()] # print(selected_features) return selected_features def calculateCorrelation(self, df): # Corrélations triées par importance avec une colonne cible target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False) print("Corrélations triées par importance avec une colonne cible") print(target_corr) # Corrélations triées par importance avec une colonne cible corr = df.corr(numeric_only=True) corr_unstacked = ( corr.unstack() .reset_index() .rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"}) ) # Supprimer les doublons col1/col2 inversés et soi-même corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]] # Trier par valeur absolue de corrélation corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index) print("Trier par valeur absolue de corrélation") print(corr_sorted.head(20)) # --- Calcul de la corrélation --- corr = df.corr(numeric_only=True) # évite les colonnes non numériques corr = corr * 100 # passage en pourcentage # --- Masque pour n’afficher que le triangle supérieur (optionnel) --- mask = np.triu(np.ones_like(corr, dtype=bool)) # --- Création de la figure --- fig, ax = plt.subplots(figsize=(96, 36)) # --- Heatmap avec un effet “température” --- sns.heatmap( corr, mask=mask, cmap="coolwarm", # palette bleu → rouge center=0, # 0 au centre annot=True, # affiche les valeurs dans chaque case fmt=".0f", # format entier (pas de décimale) cbar_kws={"label": "Corrélation (%)"}, # légende à droite linewidths=0.5, # petites lignes entre les cases ax=ax ) # --- Personnalisation --- ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20) plt.xticks(rotation=45, ha="right") plt.yticks(rotation=0) # --- Sauvegarde --- output_path = f"{self.path}/Matrice_de_correlation_temperature.png" plt.savefig(output_path, bbox_inches="tight", dpi=150) plt.close(fig) print(f"✅ Matrice enregistrée : {output_path}") def analyseStudy(self, study): # ---- après avoir exécuté la study ------ print("Best value (F1):", study.best_value) print("Best params:", study.best_params) best_trial = study.best_trial print("\n=== BEST TRIAL ===") print("Number:", best_trial.number) print("Value:", best_trial.value) print("Params:") for k, v in best_trial.params.items(): print(f" - {k}: {v}") # All trials summary print("\n=== ALL TRIALS ===") for t in study.trials: print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}") # DataFrame of trials df = study.trials_dataframe() print(df.head()) # Graphs fig = plot_optimization_history(study) fig.write_html(f"{self.path}/optimization_history.html") fig = plot_param_importances(study) fig.write_html(f"{self.path}/param_importances.html") fig = plot_slice(study) fig.write_html(f"{self.path}/slice.html") fig = plot_parallel_coordinate(study) fig.write_html(f"{self.path}/parallel_coordinates.html") def analyseShap(self, X_train, X_valid): # === SHAP plots === # Calcul SHAP explainer = shap.TreeExplainer(self.train_model) shap_values = explainer(X_train) # On choisit une observation pour le graphique waterfall # Explication du modèle de prédiction pour la première ligne de X_valid.” i = 0 # Extraction des valeurs shap_val = shap_values[i].values feature_names = X_train.columns feature_values = X_train.iloc[i] # Tri par importance absolue # order = np.argsort(np.abs(shap_val))[::-1] k = 10 order = np.argsort(np.abs(shap_val))[::-1][:k] # ---- Création figure sans l'afficher ---- plt.ioff() # Désactive l'affichage interactif shap.plots.waterfall( shap.Explanation( values=shap_val[order], base_values=shap_values.base_values[i], data=feature_values.values[order], feature_names=feature_names[order] ), show=False # IMPORTANT : n'affiche pas dans Jupyter / console ) # Sauvegarde du graphique sur disque output_path = f"{self.path}/shap_waterfall.png" plt.savefig(output_path, dpi=200, bbox_inches='tight') plt.close() # ferme la figure proprement print(f"Graphique SHAP enregistré : {output_path}") # Résumé global shap.summary_plot(shap_values, X_valid) # Force plot pour une observation force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :]) shap.save_html(f"{self.path}/shap_force_plot.html", force_plot) # # ---- Interprétation SHAP (optionnelle) ---- # try: # # print("\n===== 💡 ANALYSE SHAP =====") # explainer = shap.TreeExplainer(model) # shap_values = explainer.shap_values(X_valid) # # shap.summary_plot(shap_values[1], X_valid) # # Vérifie le type de sortie de shap_values # if isinstance(shap_values, list): # # Cas des modèles de classification (plusieurs classes) # shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1] # else: # shap_values_to_plot = shap_values # # # Ajustement des dimensions au besoin # if shap_values_to_plot.shape[1] != X_valid.shape[1]: # print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})") # min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1]) # shap_values_to_plot = shap_values_to_plot[:, :min_dim] # X_to_plot = X_valid.iloc[:, :min_dim] # else: # X_to_plot = X_valid # # plt.figure(figsize=(12, 4)) # shap.summary_plot(shap_values_to_plot, X_to_plot, show=False) # plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight") # plt.close() # except ImportError: # print("\n(SHAP non installé — `pip install shap` pour activer l’analyse SHAP.)") # FIN SHAP def initTarget(self, df): future = df['mid'].shift(-12) df['future_dd'] = (future - df['mid']) / df['mid'] df['target'] = (df['future_dd'] > 0.003).astype(int) def inspect_model(self, model): """ Affiche les informations d'un modèle ML déjà entraîné. Compatible avec scikit-learn, xgboost, lightgbm, catboost... """ print("===== 🔍 INFORMATIONS DU MODÈLE =====") # Type de modèle print(f"Type : {type(model).__name__}") print(f"Module : {model.__class__.__module__}") # Hyperparamètres if hasattr(model, "get_params"): params = model.get_params() print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====") for k, v in params.items(): print(f"{k}: {v}") # Nombre d’estimateurs if hasattr(model, "n_estimators"): print(f"\nNombre d’estimateurs : {model.n_estimators}") # Importance des features if hasattr(model, "feature_importances_"): print("\n===== 📊 IMPORTANCE DES FEATURES =====") # Correction ici : feature_names = getattr(model, "feature_names_in_", None) if isinstance(feature_names, np.ndarray): feature_names = feature_names.tolist() elif feature_names is None: feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))] fi = pd.DataFrame({ "feature": feature_names, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(fi) # Coefficients (modèles linéaires) if hasattr(model, "coef_"): print("\n===== ➗ COEFFICIENTS =====") coef = np.array(model.coef_) if coef.ndim == 1: for i, c in enumerate(coef): print(f"Feature {i}: {c:.6f}") else: print(coef) # Intercept if hasattr(model, "intercept_"): print("\nIntercept :", model.intercept_) # Classes connues if hasattr(model, "classes_"): print("\n===== 🎯 CLASSES =====") print(model.classes_) # Scores internes for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]: if hasattr(model, attr): print(f"\n{attr} = {getattr(model, attr)}") # Méthodes disponibles print("\n===== 🧩 MÉTHODES DISPONIBLES =====") methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)] print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else "")) print("\n===== ✅ FIN DE L’INSPECTION =====") def analyze_model(self, model, X_train, X_valid, y_train, y_valid): """ Analyse complète d'un modèle ML supervisé (classification binaire). Affiche performances, importance des features, matrices, seuils, etc. """ os.makedirs(self.path, exist_ok=True) # ---- Prédictions ---- preds = model.predict(X_valid) probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds # ---- Performances globales ---- print("===== 📊 ÉVALUATION DU MODÈLE =====") if hasattr(model, "feature_names_in_"): print("Colonnes du modèle :", model.feature_names_in_) print("Colonnes X_valid :", list(X_valid.columns)) print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}") print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}") print("TN (True Negative) / FP (False Positive)") print("FN (False Negative) / TP (True Positive)") print("\nRapport de classification :\n", classification_report(y_valid, preds)) # | Élément | Valeur | Signification | # | ------------------- | ------ | ----------------------------------------------------------- | # | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas d’achat) | # | FP (False Positive) | 43 | Modèle a prédit 1 alors que c’était 0 (faux signal d’achat) | # | FN (False Negative) | 108 | Modèle a prédit 0 alors que c’était 1 (manqué un achat) | # | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal d’achat) | # ---- Matrice de confusion ---- cm = confusion_matrix(y_valid, preds) print("Matrice de confusion :\n", cm) plt.figure(figsize=(4, 4)) plt.imshow(cm, cmap="Blues") plt.title("Matrice de confusion") plt.xlabel("Prédit") plt.ylabel("Réel") for i in range(2): for j in range(2): plt.text(j, i, cm[i, j], ha="center", va="center", color="black") # plt.show() plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight") plt.close() # ---- Importance des features ---- if hasattr(model, "feature_importances_"): print("\n===== 🔍 IMPORTANCE DES FEATURES =====") importance = pd.DataFrame({ "feature": X_train.columns, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(importance) # Crée une figure plus grande fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces # Trace le bar plot sur cet axe importance.plot.bar(x="feature", y="importance", legend=False, ax=ax) # Tourner les labels pour plus de lisibilité ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right') plt.title("Importance des features") # plt.show() plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight") plt.close() # ---- Arbre de décision (extrait) ---- if hasattr(model, "estimators_"): print("\n===== 🌳 EXTRAIT D’UN ARBRE =====") print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800]) # ---- Précision selon le seuil ---- thresholds = np.linspace(0.1, 0.9, 9) print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====") for t in thresholds: preds_t = (probs > t).astype(int) acc = accuracy_score(y_valid, preds_t) print(f"Seuil {t:.1f} → précision {acc:.3f}") # ---- ROC Curve ---- fpr, tpr, _ = roc_curve(y_valid, probs) plt.figure(figsize=(5, 4)) plt.plot(fpr, tpr, label="ROC curve") plt.plot([0, 1], [0, 1], linestyle="--", color="gray") plt.xlabel("Taux de faux positifs") plt.ylabel("Taux de vrais positifs") plt.title("Courbe ROC") plt.legend() # plt.show() plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight") plt.close() if hasattr(model, "predict_proba"): y_proba = model.predict_proba(X_valid)[:, 1] # Trace ou enregistre le graphique self.plot_threshold_analysis(y_valid, y_proba, step=0.05, save_path=f"{self.path}/threshold_analysis.png") # y_valid : vraies classes (0 / 1) # y_proba : probabilités de la classe 1 prédites par ton modèle # Exemple : y_proba = model.predict_proba(X_valid)[:, 1] seuils = np.arange(0.0, 1.01, 0.05) precisions, recalls, f1s = [], [], [] for seuil in seuils: y_pred = (y_proba >= seuil).astype(int) precisions.append(precision_score(y_valid, y_pred)) recalls.append(recall_score(y_valid, y_pred)) f1s.append(f1_score(y_valid, y_pred)) plt.figure(figsize=(10, 6)) plt.plot(seuils, precisions, label='Précision', marker='o') plt.plot(seuils, recalls, label='Rappel', marker='o') plt.plot(seuils, f1s, label='F1-score', marker='o') # Ajoute un point pour le meilleur F1 best_idx = np.argmax(f1s) plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f"Max F1 ({seuils[best_idx]:.2f})") plt.title("Performance du modèle selon le seuil de probabilité") plt.xlabel("Seuil de probabilité (classe 1)") plt.ylabel("Score") plt.grid(True, alpha=0.3) plt.legend() plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight') # plt.show() print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}") print("\n===== ✅ FIN DE L’ANALYSE =====") def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None): """ Affiche la précision, le rappel et le F1-score selon le seuil de décision. y_true : labels réels (0 ou 1) y_proba : probabilités prédites (P(hausse)) step : pas entre les seuils testés save_path : si renseigné, enregistre l'image au lieu d'afficher """ # Le graphique généré affichera trois courbes : # 🔵 Precision — la fiabilité de tes signaux haussiers. # 🟢 Recall — la proportion de hausses que ton modèle détecte. # 🟣 F1-score — le compromis optimal entre les deux. thresholds = np.arange(0, 1.01, step) precisions, recalls, f1s = [], [], [] for thr in thresholds: preds = (y_proba >= thr).astype(int) precisions.append(precision_score(y_true, preds)) recalls.append(recall_score(y_true, preds)) f1s.append(f1_score(y_true, preds)) plt.figure(figsize=(10, 6)) plt.plot(thresholds, precisions, label="Precision", linewidth=2) plt.plot(thresholds, recalls, label="Recall", linewidth=2) plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--") plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5") plt.title("📊 Performance selon le seuil de probabilité", fontsize=14) plt.xlabel("Seuil de décision (threshold)") plt.ylabel("Score") plt.legend() plt.grid(True, alpha=0.3) if save_path: plt.savefig(save_path, bbox_inches='tight') print(f"✅ Graphique enregistré : {save_path}") else: plt.show() def run(self): # ================================ # 1. PREPARE DATA # ================================ df = pd.read_feather(f"user_data/data/binance/BTC_USDC-{self.timeframe}.feather") df['date'] = pd.to_datetime(df['date']) df = df.set_index('date') # Optionnel : ne garder qu’une plage temporelle df = df["2025-01-01":"2025-02-01"] df = df.reset_index('date') # Supprimer NaN df = df.dropna(subset=['open', 'high', 'low', 'close', 'volume']) # S’assurer que tout est float for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = df[col].astype(float) print(df.head()) print(df.tail()) # print(df[['rsi', 'atr', 'target']].describe()) self.dataframe = self.calculateIndicators(df) self.model_indicators = self.listUsableColumns(df) # ['returns','atr','slope','drawdown', 'close'] self.trainModel(df) self.inspect_model(self.train_model) crash = Crash() crash.run()