Files
Freqtrade/tools/tensorFlow/Crash.py

1474 lines
61 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import inspect
import os
import freqtrade.vendor.qtpylib.indicators as qtpylib
# Machine Learning
import joblib
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import seaborn as sns
import shap
import ta
import talib.abstract as talib
import tensorflow as tf
from catboost import CatBoostClassifier
from optuna.visualization import plot_optimization_history
from optuna.visualization import plot_parallel_coordinate
from optuna.visualization import plot_param_importances
from optuna.visualization import plot_slice
from sklearn.calibration import CalibratedClassifierCV
from sklearn.feature_selection import SelectFromModel
from sklearn.feature_selection import VarianceThreshold
from sklearn.inspection import PartialDependenceDisplay
from sklearn.inspection import permutation_importance
from sklearn.metrics import precision_recall_curve, ConfusionMatrixDisplay
from sklearn.metrics import (
classification_report,
confusion_matrix,
accuracy_score,
roc_curve,
precision_score, recall_score
)
from sklearn.metrics import (
f1_score
)
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.tree import export_text
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.models import Sequential
from xgboost import XGBClassifier
class Crash:
timeframe = '1h'
dataframe = {}
train_model = None
short_pair = "BTC"
model_indicators = []
path = f"user_data/strategies/plots/{short_pair}/crash/"
def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'):
dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean()
dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", ema_period=window)
return dataframe
def calculeDerivees(
self,
dataframe: pd.DataFrame,
name: str,
suffixe: str = '',
window: int = 100,
coef: float = 0.15,
ema_period: int = 10,
verbose: bool = True,
) -> pd.DataFrame:
"""
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
avec epsilon adaptatif basé sur rolling percentiles.
"""
d1_col = f"{name}{suffixe}_deriv1"
d2_col = f"{name}{suffixe}_deriv2"
# d1s_col = f"{name}{suffixe}_deriv1_smooth"
# d2s_col = f"{name}{suffixe}_deriv2_smooth"
tendency_col = f"{name}{suffixe}_state"
factor1 = 100 * (ema_period / 5)
factor2 = 10 * (ema_period / 5)
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[
f"{name}{suffixe}"].shift(1)) \
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
# --- Distance à la moyenne mobile ---
dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[
f"{name}{suffixe}"]
# dérivée relative simple
dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
# lissage EMA
dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean()
# dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median()
dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1)
dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean()
# epsilon adaptatif via rolling percentile
p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05)
p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95)
p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05)
p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95)
eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef
eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef
# fallback global eps
global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef
global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef
eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1)
eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2)
# if verbose and self.dp.runmode.value in ('backtest'):
# stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T
# stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0)
# print(f"---- Derivatives stats {timeframe}----")
# print(stats)
# print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}")
# print("---------------------------")
# mapping tendency
def tag_by_derivatives(row):
idx = int(row.name)
d1v = float(row[d1_col])
d2v = float(row[d2_col])
eps1 = float(eps_d1_series.iloc[idx])
eps2 = float(eps_d2_series.iloc[idx])
# # mapping état → codes 3 lettres explicites
# # | Ancien état | Nouveau code 3 lettres | Interprétation |
# # | ----------- | ---------------------- | --------------------- |
# # | 4 | HAU | Hausse Accélérée |
# # | 3 | HSR | Hausse Ralentissement |
# # | 2 | HST | Hausse Stable |
# # | 1 | DHB | Départ Hausse |
# # | 0 | PAL | Palier / neutre |
# # | -1 | DBD | Départ Baisse |
# # | -2 | BSR | Baisse Ralentissement |
# # | -3 | BST | Baisse Stable |
# # | -4 | BAS | Baisse Accélérée |
# Palier strict
if abs(d1v) <= eps1 and abs(d2v) <= eps2:
return 0
# Départ si d1 ~ 0 mais d2 signale direction
if abs(d1v) <= eps1:
return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0
# Hausse
if d1v > eps1:
return 4 if d2v > eps2 else 3
# Baisse
if d1v < -eps1:
return -4 if d2v < -eps2 else -2
return 0
dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1)
# if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'):
# print("##################")
# print(f"# STAT {timeframe} {name}{suffixe}")
# print("##################")
# self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2")
return dataframe
def calculateIndicators(self, df):
# heikinashi = qtpylib.heikinashi(df)
# df['haopen'] = heikinashi['open']
# df['haclose'] = heikinashi['close']
# df['hapercent'] = (df['haclose'] - df['haopen']) / df['haclose']
df['mid'] = df['open'] + (df['close'] - df['open']) / 2
df['sma5'] = df['mid'].ewm(span=5, adjust=False).mean() # df["mid"].rolling(window=5).mean()
df['sma5_deriv1'] = 1000 * (df['sma5'] - df['sma5'].shift(1)) / df['sma5'].shift(1)
df['sma12'] = df['mid'].ewm(span=12, adjust=False).mean()
df['sma12_deriv1'] = 1000 * (df['sma12'] - df['sma12'].shift(1)) / df[
'sma12'].shift(1)
df['sma24'] = df['mid'].ewm(span=24, adjust=False).mean()
df['sma24_deriv1'] = 1000 * (df['sma24'] - df['sma24'].shift(1)) / df['sma24'].shift(1)
df['sma60'] = df['mid'].ewm(span=60, adjust=False).mean()
df['sma60_deriv1'] = 1000 * (df['sma60'] - df['sma60'].shift(1)) / df['sma60'].shift(1)
# df[f"sma5_inv"] = (df[f"sma5"].shift(2) >= df[f"sma5"].shift(1)) \
# & (df[f"sma5"].shift(1) <= df[f"sma5"])
df["sma5_sqrt"] = (
np.sqrt(np.abs(df["sma5"] - df["sma5"].shift(1)))
+ np.sqrt(np.abs(df["sma5"].shift(3) - df["sma5"].shift(1)))
)
df["sma5_inv"] = (
(df["sma5"].shift(2) >= df["sma5"].shift(1))
& (df["sma5"].shift(1) <= df["sma5"])
& (df["sma5_sqrt"] > 5)
)
df["sma12_sqrt"] = (
np.sqrt(np.abs(df["sma12"] - df["sma12"].shift(1)))
+ np.sqrt(np.abs(df["sma12"].shift(3) - df["sma12"].shift(1)))
)
df["sma12_inv"] = (
(df["sma12"].shift(2) >= df["sma12"].shift(1))
& (df["sma12"].shift(1) <= df["sma12"])
& (df["sma12_sqrt"] > 5)
)
df["percent"] = df['mid'].pct_change()
df["percent3"] = df['mid'].pct_change(3).rolling(3).mean()
df["percent12"] = df['mid'].pct_change(12).rolling(12).mean()
df["percent24"] = df['mid'].pct_change(24).rolling(24).mean()
df['rsi'] = talib.RSI(df['mid'], timeperiod=14)
self.calculeDerivees(df, 'rsi', ema_period=12)
df['max_rsi_12'] = talib.MAX(df['rsi'], timeperiod=12)
df['max_rsi_24'] = talib.MAX(df['rsi'], timeperiod=24)
df['max5'] = talib.MAX(df['mid'], timeperiod=5)
df['min180'] = talib.MIN(df['mid'], timeperiod=180)
df['max180'] = talib.MAX(df['mid'], timeperiod=180)
df['pct180'] = ((df["mid"] - df['min180']) / (df['max180'] - df['min180']))
# df = self.rsi_trend_probability(df, short=60, long=360)
###########################################################
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(df), window=20, stds=2)
df['bb_lowerband'] = bollinger['lower']
df['bb_middleband'] = bollinger['mid']
df['bb_upperband'] = bollinger['upper']
df["bb_percent"] = (
(df["close"] - df["bb_lowerband"]) /
(df["bb_upperband"] - df["bb_lowerband"])
)
df["bb_width"] = (df["bb_upperband"] - df["bb_lowerband"]) / df["sma24"]
# Calcul MACD
macd, macdsignal, macdhist = talib.MACD(
df['close'],
fastperiod=12,
slowperiod=26,
signalperiod=9
)
# | Nom | Formule / définition | Signification |
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre lécart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal dachat <br> - Croisement du MACD en dessous → signal de vente |
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et laccélération** de la tendance. <br> - Positif et croissant → tendance haussière qui saccélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui saccélère <br> - Négatif mais croissant → ralentissement de la baisse |
# Ajouter dans le df
df['macd'] = macd
df['macdsignal'] = macdsignal
df['macdhist'] = macdhist
# ------------------------------------------------------------------------------------
# rolling SMA indicators (used for trend detection too)
# s_short = self.DEFAULT_PARAMS['sma_short']
# s_long = self.DEFAULT_PARAMS['sma_long']
#
# df[f'sma_{s_short}'] = df['close'].rolling(window=s_short).mean()
# df[f'sma_{s_long}'] = df['close'].rolling(window=s_long).mean()
# --- pente brute ---
df['slope'] = df['sma24'].diff()
# --- lissage EMA ---
df['slope_smooth'] = df['slope'].ewm(span=10, adjust=False).mean()
###########################
# df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume
# Assure-toi qu'il est trié par date croissante
timeframe = self.timeframe
# --- Volatilité normalisée ---
df['atr'] = ta.volatility.AverageTrueRange(
high=df['high'], low=df['low'], close=df['close'], window=14
).average_true_range()
df['atr_norm'] = df['atr'] / df['close']
# --- Force de tendance ---
df['adx'] = ta.trend.ADXIndicator(
high=df['high'], low=df['low'], close=df['close'], window=14
).adx()
# --- Volume directionnel (On Balance Volume) ---
df['obv'] = ta.volume.OnBalanceVolumeIndicator(
close=df['close'], volume=df['volume']
).on_balance_volume()
self.calculeDerivees(df, 'obv', ema_period=1)
df['obv12'] = ta.volume.OnBalanceVolumeIndicator(
close=df['sma12'], volume=df['volume'].rolling(12).sum()
).on_balance_volume()
df['obv24'] = ta.volume.OnBalanceVolumeIndicator(
close=df['sma24'], volume=df['volume'].rolling(24).sum()
).on_balance_volume()
# self.calculeDerivees(df, 'obv5', ema_period=5)
# --- Volatilité récente (écart-type des rendements) ---
df['vol_24'] = df['percent'].rolling(24).std()
# Compter les baisses / hausses consécutives
# self.calculateDownAndUp(df, limit=0.0001)
# df : ton df OHLCV + indicateurs existants
# Assurez-vous que les colonnes suivantes existent :
# 'max_rsi_12', 'roc_24', 'bb_percent_1h'
# --- Filtrage des NaN initiaux ---
# df = df.dropna()
df['rsi_slope'] = df['rsi'].diff(3) / 3 # vitesse moyenne du RSI
df['adx_change'] = df['adx'] - df['adx'].shift(12) # évolution de la tendance
df['volatility_ratio'] = df['atr_norm'] / df['bb_width']
df["rsi_diff"] = df["rsi"] - df["rsi"].shift(3)
df["slope_ratio"] = df["sma5_deriv1"] / (df["sma60_deriv1"] + 1e-9)
df["divergence"] = (df["rsi_deriv1"] * df["sma5_deriv1"]) < 0
# features
df['returns'] = df['close'].pct_change()
df['atr'] = (df['high'] - df['low']).rolling(14).mean()
df['slope'] = df['close'].rolling(20).mean().diff()
df['drawdown'] = (df['close'] - df['close'].rolling(48).max()) / df['close'].rolling(48).max()
df['atr_pct'] = df['atr'] / df['close']
df['vol_z'] = (df['volume'] - df['volume'].rolling(48).mean()) / df['volume'].rolling(48).std()
df['rsi_slope'] = df['rsi'].diff(3)
# VOLATILITÉ
df['atr_norm'] = df['atr'] / df['close']
# DRAWDOWN (critique)
df['rolling_max'] = df['close'].rolling(48).max()
df['drawdown'] = (df['close'] - df['rolling_max']) / df['rolling_max']
df['dd_score'] = np.clip(-df['drawdown'] / 0.10, 0, 1)
# TENDANCE (slope)
df['MA7'] = df['close'].rolling(7).mean()
df['MA14'] = df['close'].rolling(14).mean()
df['slope'] = df['MA7'] - df['MA14']
df['slope_score'] = np.clip(1 - (df['slope'] / df['close']), 0, 1)
# NÉGATIVE STREAK
df['neg_streak'] = df['close'].pct_change().apply(lambda x: min(x, 0)).rolling(24).sum()
df['neg_score'] = np.clip(-df['neg_streak'] / 0.05, 0, 1)
# COMPOSANTS COURT TERME
df['pct_change_3'] = df['close'].pct_change(3)
df['pct_change_3_smooth'] = df['pct_change_3'].rolling(6).mean()
df['crash_score'] = np.clip(1 + (df['pct_change_3_smooth'] / 0.05), 0, 1)
df['speed'] = df['close'].diff().rolling(6).mean()
df['accel'] = df['speed'].diff().rolling(6).mean()
df['STD20'] = df['close'].rolling(20).std()
df['accel_score'] = np.clip(1 + (df['accel'] / (df['STD20'] + 1e-9)), 0, 1)
# INDEX FINAL
df['crash_raw'] = (
0.35 * df['dd_score'] + # le plus important pour crash lent
0.25 * df['neg_score'] +
0.20 * df['slope_score'] +
0.10 * df['crash_score'] +
0.10 * df['accel_score']
)
# LISSAGE SIMPLE
df['crash_risk_index'] = df['crash_raw'].ewm(span=24).mean()
return df
def feature_auc_scores(self, X, y):
aucs = {}
for col in X.columns:
try:
aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0))
except Exception:
aucs[col] = np.nan
return pd.Series(aucs).sort_values(ascending=False)
def listUsableColumns(self, dataframe):
# Étape 1 : sélectionner numériques
numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
# Étape 2 : enlever constantes
usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
and not c.endswith("_state")
and not c.endswith("_1d")
# and not c.endswith("_1h")
# and not c.startswith("open") and not c.startswith("close")
# and not c.startswith("low") and not c.startswith("high")
# and not c.startswith("haopen") and not c.startswith("haclose")
# and not c.startswith("bb_lower") and not c.startswith("bb_upper")
# and not c.startswith("bb_middle")
and not c.endswith("_count")
and not c.endswith("_class") and not c.endswith("_price")
and not c.startswith('stop_buying')
and not c.startswith('target')
and not c.startswith('lvl')
and not c.startswith('confidence_index')
]
# Étape 3 : remplacer inf et NaN par 0
dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
print("Colonnes utilisables pour le modèle :")
print(usable_cols)
# self.model_indicators = usable_cols
return usable_cols
def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7):
"""
Sélectionne les features les plus corrélées avec target,
tout en supprimant celles trop corrélées entre elles.
"""
# 1⃣ Calcul des corrélations absolues avec la cible
corr = df.corr(numeric_only=True)
corr_target = corr[target].abs().sort_values(ascending=False)
# 2⃣ Prend les N features les plus corrélées avec la cible (hors target)
features = corr_target.drop(target).head(top_n).index.tolist()
# 3⃣ Évite les features trop corrélées entre elles
selected = []
for feat in features:
too_correlated = False
for sel in selected:
if abs(corr.loc[feat, sel]) > corr_threshold:
too_correlated = True
break
if not too_correlated:
selected.append(feat)
# 4⃣ Retourne un DataFrame propre avec les valeurs de corrélation
selected_corr = pd.DataFrame({
"feature": selected,
"corr_with_target": [corr.loc[f, target] for f in selected]
}).sort_values(by="corr_with_target", key=np.abs, ascending=False)
return selected_corr
def drawPredictions(self, df, indicators, y_proba, threshold=0.5):
"""
Trace simultanément plusieurs indicateurs et la prédiction du modèle.
Parameters
----------
df : pd.DataFrame
Dataframe avec les colonnes des indicateurs et la target.
indicators : list[str]
Liste des colonnes du dataframe à tracer.
y_proba : np.array
Probabilités prédites par le modèle (valeurs continues entre 0 et 1)
threshold : float
Seuil pour convertir probabilité en signal binaire
output_file : str
Fichier sur lequel sauvegarder le graphique
"""
plt.figure(figsize=(18, 6))
# Tracer les indicateurs
for col in indicators:
plt.plot(df.index, df[col], label=col, alpha=0.7)
# Tracer la prédiction du modèle (probabilité)
plt.plot(df.index, y_proba, label="Prediction prob.", color="black", linestyle="--")
# Optionnel : signal binaire (1 si prob > threshold)
y_signal = (y_proba > threshold).astype(int)
plt.scatter(df.index, y_signal, color='red', marker='o', label='Signal > threshold', s=20)
plt.title("Indicateurs + prédiction MLP")
plt.xlabel("Date")
plt.ylabel("Valeur / Probabilité")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f"{self.path}/indicators_vs_prediction.png")
plt.close()
def drawSequentialGraphs(self, model, history, X_train_scaled, X_valid_scaled, y_train,
y_valid, thresholds=None, best_threshold=None
):
"""
Génère et sauvegarde tous les graphes utiles pour un MLP (Sequential).
Parameters
----------
model : keras Sequential
history : History (retour de model.fit)
X_train_scaled, X_valid_scaled : np.array
y_train, y_valid : np.array
thresholds : list[float] | None
best_threshold : float | None
"""
feature_names = self.listUsableColumns(self.dataframe)
# =========================
# 1⃣ Courbes de loss
# =========================
plt.figure()
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='val')
plt.legend()
plt.title("MLP loss")
plt.savefig(f"{self.path}/loss.png")
plt.close()
# =========================
# 2⃣ Probabilités
# =========================
y_proba = model.predict(X_valid_scaled).ravel()
plt.figure()
plt.hist(y_proba[y_valid == 0], bins=50, alpha=0.6, label="No crash")
plt.hist(y_proba[y_valid == 1], bins=50, alpha=0.6, label="Crash")
plt.legend()
plt.title("Predicted probability distribution")
plt.savefig(f"{self.path}/proba_distribution.png")
plt.close()
# =========================
# 3⃣ PrecisionRecall
# =========================
precision, recall, _ = precision_recall_curve(y_valid, y_proba)
plt.figure()
plt.plot(recall, precision)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall curve")
plt.savefig(f"{self.path}/precision_recall.png")
plt.close()
# =========================
# 4⃣ F1 vs threshold
# =========================
if thresholds is None:
thresholds = np.linspace(0.05, 0.95, 60)
f1s = [
f1_score(y_valid, (y_proba > t).astype(int))
for t in thresholds
]
plt.figure()
plt.plot(thresholds, f1s)
plt.xlabel("Threshold")
plt.ylabel("F1 score")
plt.title("F1 vs threshold")
plt.savefig(f"{self.path}/f1_threshold.png")
plt.close()
# Choix auto du seuil si absent
if best_threshold is None:
best_threshold = thresholds[int(np.argmax(f1s))]
# =========================
# 5⃣ Matrice de confusion
# =========================
ConfusionMatrixDisplay.from_predictions(
y_valid,
(y_proba > best_threshold).astype(int)
)
plt.title(f"Confusion matrix (threshold={best_threshold:.2f})")
plt.savefig(f"{self.path}/confusion_matrix.png")
plt.close()
# # =========================
# # 6⃣ Permutation importance
# # =========================
# r = permutation_importance(
# model,
# X_valid_scaled,
# y_valid,
# scoring="f1",
# n_repeats=8,
# n_jobs=-1
# )
#
# importances = pd.Series(
# r.importances_mean,
# index=feature_names
# ).sort_values()
#
# plt.figure(figsize=(7, 4))
# importances.plot(kind="barh")
# plt.title("Permutation importance (MLP)")
# plt.savefig(f"{self.path}/permutation_importance.png")
# plt.close()
# Exemple : on choisit 3 indicateurs du dataframe
indicators = ['percent12']
# y_proba = model.predict(X_valid_scaled).ravel()
self.drawPredictions(
df=self.dataframe.iloc[-len(y_proba):], # sélectionner la période correspondant à X_valid
indicators=indicators,
y_proba=y_proba,
threshold=0.5
)
# =========================
# 7⃣ Sauvegarde seuil
# =========================
with open(f"{self.path}/best_threshold.txt", "w") as f:
f.write(str(best_threshold))
return {
"best_threshold": best_threshold,
"best_f1": max(f1s)
}
def optimize_sequential(self, X_train, X_valid, y_train, y_valid , n_trials=20):
def objective(trial):
tf.keras.backend.clear_session()
# 🔧 Hyperparams
n1 = trial.suggest_int("units_1", 32, 128)
n2 = trial.suggest_int("units_2", 16, 64)
dropout1 = trial.suggest_float("dropout_1", 0.1, 0.5)
dropout2 = trial.suggest_float("dropout_2", 0.1, 0.5)
lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])
# 🔒 Scaling (train only)
scaler = StandardScaler()
X_tr = scaler.fit_transform(X_train)
X_val = scaler.transform(X_valid)
# 🧠 Model
model = Sequential([
Dense(n1, activation='relu', input_shape=(X_tr.shape[1],)),
Dropout(dropout1),
Dense(n2, activation='relu'),
Dropout(dropout2),
Dense(1, activation='sigmoid')
])
model.compile(
optimizer=tf.keras.optimizers.Adam(lr),
loss='binary_crossentropy'
)
es = EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
model.fit(
X_tr, y_train,
validation_data=(X_val, y_valid),
epochs=150,
batch_size=batch_size,
callbacks=[es],
verbose=0
)
proba = model.predict(X_val).ravel()
# 🔥 Optimisation sur le F1 crash
thresholds = [0.3, 0.4, 0.5, 0.6]
best_f1 = max(
f1_score(y_valid, (proba > t).astype(int))
for t in thresholds
)
return best_f1
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=n_trials)
print(study.best_trial)
best = study.best_trial.params
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
model = Sequential([
Dense(best["units_1"], activation='relu', input_shape=(X_train_scaled.shape[1],)),
Dropout(best["dropout_1"]),
Dense(best["units_2"], activation='relu'),
Dropout(best["dropout_2"]),
Dense(1, activation='sigmoid')
])
model.compile(
optimizer=tf.keras.optimizers.Adam(best["lr"]),
loss='binary_crossentropy'
)
history = model.fit(
X_train_scaled, y_train,
validation_data=(X_valid_scaled, y_valid),
epochs=150,
batch_size=best["batch_size"],
callbacks=[EarlyStopping(patience=10, restore_best_weights=True)],
verbose=0
)
result = self.drawSequentialGraphs(
model=model,
history=history,
X_train_scaled=X_train_scaled,
X_valid_scaled=X_valid_scaled,
y_train=y_train,
y_valid=y_valid
)
return model, study
def optimize_xgbclassifier(self, X_train, y_train, X_valid, y_valid, n_trials=20):
def objective(trial):
# local_model = XGBClassifier(
# n_estimators=300, # nombre d'arbres plus raisonnable
# learning_rate=0.01, # un peu plus rapide que 0.006, mais stable
# max_depth=4, # capture plus de patterns que 3, sans overfitting excessif
# subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting
# colsample_bytree=0.8, # 80% des features par arbre
# gamma=0.01, # gain minimal pour un split → régularisation
# reg_alpha=0.01, # L1 régularisation des feuilles
# reg_lambda=1, # L2 régularisation des feuilles
# n_jobs=-1, # utilise tous les cœurs CPU pour accélérer
# random_state=42, # reproductibilité
# missing=float('nan'), # valeur manquante reconnue
# eval_metric='logloss' # métrique pour classification binaire
# )
local_model = XGBClassifier(
n_estimators=trial.suggest_int("n_estimators", 300, 500),
max_depth=trial.suggest_int("max_depth", 1, 6),
learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True),
subsample=trial.suggest_float("subsample", 0.6, 1.0),
colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0),
scale_pos_weight=1,
objective="binary:logistic",
eval_metric="logloss",
n_jobs=-1
)
local_model.fit(
X_train,
y_train,
eval_set=[(X_valid, y_valid)],
# early_stopping_rounds=50,
verbose=False
)
proba = local_model.predict_proba(X_valid)[:, 1]
thresholds = np.linspace(0.1, 0.9, 50)
best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds)
return best_f1
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
# SHAP
# Reconstruction du modèle final avec les meilleurs hyperparamètres
# Récupération des meilleurs paramètres trouvés
best_params = study.best_params
best_model = XGBClassifier(**best_params)
best_model.fit(X_train, y_train)
self.analyseShap(X_train, X_valid)
selected_features = self.calibrateModel(best_model, X_train, y_train)
self.analyseImportances(selected_features, X_train, X_valid, y_valid)
return best_model, study
def optimize_catboost(self, X_train, y_train, X_valid, y_valid, n_trials=20):
"""
Optimise un modèle CatBoost pour la détection de crashs rares.
"""
# Calcul automatique du poids pour la classe minoritaire
scale_pos_weight = len(y_train[y_train == 0]) / max(len(y_train[y_train == 1]), 1)
def objective(trial):
model = CatBoostClassifier(
iterations=trial.suggest_int("iterations", 200, 500),
depth=trial.suggest_int("depth", 3, 8),
learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
l2_leaf_reg=trial.suggest_float("l2_leaf_reg", 1, 10),
subsample=trial.suggest_float("subsample", 0.6, 1.0),
scale_pos_weight=scale_pos_weight,
eval_metric="F1",
random_state=42,
verbose=0
)
# Entraînement
model.fit(X_train, y_train, eval_set=(X_valid, y_valid), early_stopping_rounds=50)
# Probabilités pour la classe 1 (crash)
proba = model.predict_proba(X_valid)[:, 1]
# Recherche du seuil optimal pour maximiser F1
thresholds = np.linspace(0.05, 0.5, 20) # seuil plus bas pour classe rare
best_f1 = 0
for t in thresholds:
f1 = f1_score(y_valid, (proba > t).astype(int))
if f1 > best_f1:
best_f1 = f1
return best_f1
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=n_trials)
print("Meilleurs paramètres :", study.best_params)
print("Meilleur F1 :", study.best_value)
# Entraînement final avec les meilleurs paramètres
best_params = study.best_params
best_model = CatBoostClassifier(
iterations=best_params["iterations"],
depth=best_params["depth"],
learning_rate=best_params["learning_rate"],
l2_leaf_reg=best_params["l2_leaf_reg"],
subsample=best_params["subsample"],
scale_pos_weight=scale_pos_weight,
eval_metric="F1",
random_state=42,
verbose=0
)
best_model.fit(X_train, y_train)
self.analyseShap(X_train, X_valid)
selected_features = self.calibrateModel(best_model, X_train, y_train)
self.analyseImportances(selected_features, X_train, X_valid, y_valid)
return best_model, study
def trainModel(self, dataframe):
pair = self.short_pair
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option("display.width", 200)
path = self.path # f"user_data/plots/{pair}/"
os.makedirs(path, exist_ok=True)
os.system(f"rm -rf {self.path}/*")
# # Étape 1 : sélectionner numériques
# numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
#
# # Étape 2 : enlever constantes
# usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
# and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d")
# and not c.endswith("_class") and not c.endswith("_price")
# and not c.startswith('stop_buying'))]
#
# # Étape 3 : remplacer inf et NaN par 0
# dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
#
# print("Colonnes utilisables pour le modèle :")
# print(usable_cols)
#
# self.model_indicators = usable_cols
#
df = dataframe[self.model_indicators].copy()
# Corrélations des colonnes
corr = df.corr(numeric_only=True)
print("Corrélation des colonnes")
print(corr)
# 3⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
# df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int)
# df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 100).astype(int)
# df['target'] = df['target'].fillna(0).astype(int)
# label : crash si -n% dans les p heures
self.initTarget(df)
self.calculateCorrelation(df)
# # Exemple d'utilisation :
# selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7)
# print("===== 🎯 FEATURES SÉLECTIONNÉES =====")
# print(selected_corr)
#
# # Nettoyage
# df = df.dropna()
#
# X = df[self.model_indicators]
# y = df['target'] # ta colonne cible binaire ou numérique
# print("===== 🎯 FEATURES SCORES =====")
# print(self.feature_auc_scores(X, y))
# 4⃣ Split train/test
X = df[self.model_indicators]
y = df['target']
# Séparation temporelle (train = 80 %, valid = 20 %)
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False)
# Nettoyage des valeurs invalides
selector = VarianceThreshold(threshold=0.0001)
selector.fit(X_train)
selected = X_train.columns[selector.get_support()]
print("Colonnes conservées :", list(selected))
# 5⃣ Entraînement du modèle
# self.train_model = RandomForestClassifier(n_estimators=200, random_state=42)
assert len(X_train) == len(y_train)
assert len(X_valid) == len(y_valid)
self.train_model, study = self.optimize_sequential(X_train, X_valid, y_train, y_valid, n_trials=50)
self.analyseStudy(study)
y_pred = self.train_model.predict(X_valid)
if hasattr(self.train_model, "predict_proba"):
y_proba = self.train_model.predict_proba(X_valid)[:, 1]
else:
y_proba = self.train_model.predict(X_valid).ravel()
# print(classification_report(y_valid, y_pred))
# print(confusion_matrix(y_valid, y_pred))
print("\nRapport de classification :\n", classification_report(y_valid, y_pred))
print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred))
# # Importances
# importances = pd.DataFrame({
# "feature": self.train_model.feature_name_,
# "importance": self.train_model.feature_importances_
# }).sort_values("importance", ascending=False)
# print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
# print(importances)
best_f1 = 0
best_t = 0.5
for t in [0.3, 0.4, 0.5, 0.6, 0.7]:
y_pred_thresh = (y_proba > t).astype(int)
score = f1_score(y_valid, y_pred_thresh)
print(f"Seuil {t:.1f} → F1: {score:.3f}")
if score > best_f1:
best_f1 = score
best_t = t
print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}")
# 6⃣ Évaluer la précision (facultatif)
preds = self.train_model.predict(X_valid)
acc = accuracy_score(y_valid, preds)
print(f"Accuracy: {acc:.3f}")
# 7⃣ Sauvegarde du modèle
joblib.dump(self.train_model, f"{self.path}/{pair}_rf_model.pkl")
print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl")
# X = dataframe des features (après shift/rolling/indicators)
# y = target binaire ou décimale
# model = ton modèle entraîné (RandomForestClassifier ou Regressor)
# # --- 1⃣ Mutual Information (MI) ---
# mi_scores = mutual_info_classif(X.fillna(0), y)
# mi_series = pd.Series(mi_scores, index=X.columns, name='MI')
#
# # --- 2⃣ Permutation Importance (PI) ---
# pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
# pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI')
#
# # --- 3⃣ Combinaison dans un seul dataframe ---
# importance_df = pd.concat([mi_series, pi_series], axis=1)
# importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle
# print(importance_df)
#
# importance_df.plot(kind='bar', figsize=(10, 5))
# plt.title("Mutual Info vs Permutation Importance")
# plt.ylabel("Score")
# plt.show()
self.analyze_model(self.train_model, X_train, X_valid, y_train, y_valid)
def analyseImportances(self, selected_features, X_train, X_valid, y_valid):
# Feature importance
if hasattr(self.train_model, "feature_importances_"):
importances = self.train_model.feature_importances_
feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False)
# Affichage
feat_imp.plot(kind='bar', figsize=(12, 6))
plt.title("Feature importances")
# plt.show()
plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight')
result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10,
random_state=42)
perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False)
perm_imp.plot(kind='bar', figsize=(12, 6))
plt.title("Permutation feature importance")
# plt.show()
plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight')
fig, ax = plt.subplots(figsize=(24, 48))
PartialDependenceDisplay.from_estimator(
self.train_model,
X_valid,
selected_features,
kind="average",
ax=ax
)
fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight")
plt.close(fig)
def calibrateModel(self, model, X_train, y_train):
# 2⃣ Sélection des features AVANT calibration
sfm = SelectFromModel(model, threshold="median", prefit=True)
selected_features = X_train.columns[sfm.get_support()]
print(selected_features)
# 3⃣ Calibration ensuite (facultative)
calibrated = CalibratedClassifierCV(model, method='sigmoid', cv=5)
calibrated.fit(X_train[selected_features], y_train)
print(calibrated)
# # # calibration
# model = CalibratedClassifierCV(model, method='sigmoid', cv=5)
# # Sélection
# sfm = SelectFromModel(model, threshold="median")
# sfm.fit(X_train, y_train)
# selected_features = X_train.columns[sfm.get_support()]
# print(selected_features)
return selected_features
def calculateCorrelation(self, df):
# Corrélations triées par importance avec une colonne cible
target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False)
print("Corrélations triées par importance avec une colonne cible")
print(target_corr)
# Corrélations triées par importance avec une colonne cible
corr = df.corr(numeric_only=True)
corr_unstacked = (
corr.unstack()
.reset_index()
.rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"})
)
# Supprimer les doublons col1/col2 inversés et soi-même
corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]]
# Trier par valeur absolue de corrélation
corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index)
print("Trier par valeur absolue de corrélation")
print(corr_sorted.head(20))
# --- Calcul de la corrélation ---
corr = df.corr(numeric_only=True) # évite les colonnes non numériques
corr = corr * 100 # passage en pourcentage
# --- Masque pour nafficher que le triangle supérieur (optionnel) ---
mask = np.triu(np.ones_like(corr, dtype=bool))
# --- Création de la figure ---
fig, ax = plt.subplots(figsize=(96, 36))
# --- Heatmap avec un effet “température” ---
sns.heatmap(
corr,
mask=mask,
cmap="coolwarm", # palette bleu → rouge
center=0, # 0 au centre
annot=True, # affiche les valeurs dans chaque case
fmt=".0f", # format entier (pas de décimale)
cbar_kws={"label": "Corrélation (%)"}, # légende à droite
linewidths=0.5, # petites lignes entre les cases
ax=ax
)
# --- Personnalisation ---
ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20)
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
# --- Sauvegarde ---
output_path = f"{self.path}/Matrice_de_correlation_temperature.png"
plt.savefig(output_path, bbox_inches="tight", dpi=150)
plt.close(fig)
print(f"✅ Matrice enregistrée : {output_path}")
def analyseStudy(self, study):
# ---- après avoir exécuté la study ------
print("Best value (F1):", study.best_value)
print("Best params:", study.best_params)
best_trial = study.best_trial
print("\n=== BEST TRIAL ===")
print("Number:", best_trial.number)
print("Value:", best_trial.value)
print("Params:")
for k, v in best_trial.params.items():
print(f" - {k}: {v}")
# All trials summary
print("\n=== ALL TRIALS ===")
for t in study.trials:
print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}")
# DataFrame of trials
df = study.trials_dataframe()
print(df.head())
# Graphs
fig = plot_optimization_history(study)
fig.write_html(f"{self.path}/optimization_history.html")
fig = plot_param_importances(study)
fig.write_html(f"{self.path}/param_importances.html")
fig = plot_slice(study)
fig.write_html(f"{self.path}/slice.html")
fig = plot_parallel_coordinate(study)
fig.write_html(f"{self.path}/parallel_coordinates.html")
def analyseShap(self, X_train, X_valid):
# === SHAP plots ===
# Calcul SHAP
explainer = shap.TreeExplainer(self.train_model)
shap_values = explainer(X_train)
# On choisit une observation pour le graphique waterfall
# Explication du modèle de prédiction pour la première ligne de X_valid.”
i = 0
# Extraction des valeurs
shap_val = shap_values[i].values
feature_names = X_train.columns
feature_values = X_train.iloc[i]
# Tri par importance absolue
# order = np.argsort(np.abs(shap_val))[::-1]
k = 10
order = np.argsort(np.abs(shap_val))[::-1][:k]
# ---- Création figure sans l'afficher ----
plt.ioff() # Désactive l'affichage interactif
shap.plots.waterfall(
shap.Explanation(
values=shap_val[order],
base_values=shap_values.base_values[i],
data=feature_values.values[order],
feature_names=feature_names[order]
),
show=False # IMPORTANT : n'affiche pas dans Jupyter / console
)
# Sauvegarde du graphique sur disque
output_path = f"{self.path}/shap_waterfall.png"
plt.savefig(output_path, dpi=200, bbox_inches='tight')
plt.close() # ferme la figure proprement
print(f"Graphique SHAP enregistré : {output_path}")
# Résumé global
shap.summary_plot(shap_values, X_valid)
# Force plot pour une observation
force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :])
shap.save_html(f"{self.path}/shap_force_plot.html", force_plot)
# # ---- Interprétation SHAP (optionnelle) ----
# try:
#
# print("\n===== 💡 ANALYSE SHAP =====")
# explainer = shap.TreeExplainer(model)
# shap_values = explainer.shap_values(X_valid)
# # shap.summary_plot(shap_values[1], X_valid)
# # Vérifie le type de sortie de shap_values
# if isinstance(shap_values, list):
# # Cas des modèles de classification (plusieurs classes)
# shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
# else:
# shap_values_to_plot = shap_values
#
# # Ajustement des dimensions au besoin
# if shap_values_to_plot.shape[1] != X_valid.shape[1]:
# print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})")
# min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1])
# shap_values_to_plot = shap_values_to_plot[:, :min_dim]
# X_to_plot = X_valid.iloc[:, :min_dim]
# else:
# X_to_plot = X_valid
#
# plt.figure(figsize=(12, 4))
# shap.summary_plot(shap_values_to_plot, X_to_plot, show=False)
# plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight")
# plt.close()
# except ImportError:
# print("\n(SHAP non installé — `pip install shap` pour activer lanalyse SHAP.)")
# FIN SHAP
def initTarget(self, df):
future = df['mid'].shift(-12)
df['future_dd'] = (future - df['mid']) / df['mid']
df['target'] = (df['future_dd'] > 0.003).astype(int)
def inspect_model(self, model):
"""
Affiche les informations d'un modèle ML déjà entraîné.
Compatible avec scikit-learn, xgboost, lightgbm, catboost...
"""
print("===== 🔍 INFORMATIONS DU MODÈLE =====")
# Type de modèle
print(f"Type : {type(model).__name__}")
print(f"Module : {model.__class__.__module__}")
# Hyperparamètres
if hasattr(model, "get_params"):
params = model.get_params()
print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====")
for k, v in params.items():
print(f"{k}: {v}")
# Nombre destimateurs
if hasattr(model, "n_estimators"):
print(f"\nNombre destimateurs : {model.n_estimators}")
# Importance des features
if hasattr(model, "feature_importances_"):
print("\n===== 📊 IMPORTANCE DES FEATURES =====")
# Correction ici :
feature_names = getattr(model, "feature_names_in_", None)
if isinstance(feature_names, np.ndarray):
feature_names = feature_names.tolist()
elif feature_names is None:
feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))]
fi = pd.DataFrame({
"feature": feature_names,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(fi)
# Coefficients (modèles linéaires)
if hasattr(model, "coef_"):
print("\n===== ➗ COEFFICIENTS =====")
coef = np.array(model.coef_)
if coef.ndim == 1:
for i, c in enumerate(coef):
print(f"Feature {i}: {c:.6f}")
else:
print(coef)
# Intercept
if hasattr(model, "intercept_"):
print("\nIntercept :", model.intercept_)
# Classes connues
if hasattr(model, "classes_"):
print("\n===== 🎯 CLASSES =====")
print(model.classes_)
# Scores internes
for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]:
if hasattr(model, attr):
print(f"\n{attr} = {getattr(model, attr)}")
# Méthodes disponibles
print("\n===== 🧩 MÉTHODES DISPONIBLES =====")
methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)]
print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else ""))
print("\n===== ✅ FIN DE LINSPECTION =====")
def analyze_model(self, model, X_train, X_valid, y_train, y_valid):
"""
Analyse complète d'un modèle ML supervisé (classification binaire).
Affiche performances, importance des features, matrices, seuils, etc.
"""
os.makedirs(self.path, exist_ok=True)
# ---- Prédictions ----
preds = model.predict(X_valid)
probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds
# ---- Performances globales ----
print("===== 📊 ÉVALUATION DU MODÈLE =====")
if hasattr(model, "feature_names_in_"):
print("Colonnes du modèle :", model.feature_names_in_)
print("Colonnes X_valid :", list(X_valid.columns))
print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}")
print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}")
print("TN (True Negative) / FP (False Positive)")
print("FN (False Negative) / TP (True Positive)")
print("\nRapport de classification :\n", classification_report(y_valid, preds))
# | Élément | Valeur | Signification |
# | ------------------- | ------ | ----------------------------------------------------------- |
# | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas dachat) |
# | FP (False Positive) | 43 | Modèle a prédit 1 alors que cétait 0 (faux signal dachat) |
# | FN (False Negative) | 108 | Modèle a prédit 0 alors que cétait 1 (manqué un achat) |
# | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal dachat) |
# ---- Matrice de confusion ----
cm = confusion_matrix(y_valid, preds)
print("Matrice de confusion :\n", cm)
plt.figure(figsize=(4, 4))
plt.imshow(cm, cmap="Blues")
plt.title("Matrice de confusion")
plt.xlabel("Prédit")
plt.ylabel("Réel")
for i in range(2):
for j in range(2):
plt.text(j, i, cm[i, j], ha="center", va="center", color="black")
# plt.show()
plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight")
plt.close()
# ---- Importance des features ----
if hasattr(model, "feature_importances_"):
print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
importance = pd.DataFrame({
"feature": X_train.columns,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(importance)
# Crée une figure plus grande
fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces
# Trace le bar plot sur cet axe
importance.plot.bar(x="feature", y="importance", legend=False, ax=ax)
# Tourner les labels pour plus de lisibilité
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right')
plt.title("Importance des features")
# plt.show()
plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight")
plt.close()
# ---- Arbre de décision (extrait) ----
if hasattr(model, "estimators_"):
print("\n===== 🌳 EXTRAIT DUN ARBRE =====")
print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800])
# ---- Précision selon le seuil ----
thresholds = np.linspace(0.1, 0.9, 9)
print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====")
for t in thresholds:
preds_t = (probs > t).astype(int)
acc = accuracy_score(y_valid, preds_t)
print(f"Seuil {t:.1f} → précision {acc:.3f}")
# ---- ROC Curve ----
fpr, tpr, _ = roc_curve(y_valid, probs)
plt.figure(figsize=(5, 4))
plt.plot(fpr, tpr, label="ROC curve")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
plt.xlabel("Taux de faux positifs")
plt.ylabel("Taux de vrais positifs")
plt.title("Courbe ROC")
plt.legend()
# plt.show()
plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight")
plt.close()
if hasattr(model, "predict_proba"):
y_proba = model.predict_proba(X_valid)[:, 1]
# Trace ou enregistre le graphique
self.plot_threshold_analysis(y_valid, y_proba, step=0.05, save_path=f"{self.path}/threshold_analysis.png")
# y_valid : vraies classes (0 / 1)
# y_proba : probabilités de la classe 1 prédites par ton modèle
# Exemple : y_proba = model.predict_proba(X_valid)[:, 1]
seuils = np.arange(0.0, 1.01, 0.05)
precisions, recalls, f1s = [], [], []
for seuil in seuils:
y_pred = (y_proba >= seuil).astype(int)
precisions.append(precision_score(y_valid, y_pred))
recalls.append(recall_score(y_valid, y_pred))
f1s.append(f1_score(y_valid, y_pred))
plt.figure(figsize=(10, 6))
plt.plot(seuils, precisions, label='Précision', marker='o')
plt.plot(seuils, recalls, label='Rappel', marker='o')
plt.plot(seuils, f1s, label='F1-score', marker='o')
# Ajoute un point pour le meilleur F1
best_idx = np.argmax(f1s)
plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f"Max F1 ({seuils[best_idx]:.2f})")
plt.title("Performance du modèle selon le seuil de probabilité")
plt.xlabel("Seuil de probabilité (classe 1)")
plt.ylabel("Score")
plt.grid(True, alpha=0.3)
plt.legend()
plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight')
# plt.show()
print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}")
print("\n===== ✅ FIN DE LANALYSE =====")
def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None):
"""
Affiche la précision, le rappel et le F1-score selon le seuil de décision.
y_true : labels réels (0 ou 1)
y_proba : probabilités prédites (P(hausse))
step : pas entre les seuils testés
save_path : si renseigné, enregistre l'image au lieu d'afficher
"""
# Le graphique généré affichera trois courbes :
# 🔵 Precision — la fiabilité de tes signaux haussiers.
# 🟢 Recall — la proportion de hausses que ton modèle détecte.
# 🟣 F1-score — le compromis optimal entre les deux.
thresholds = np.arange(0, 1.01, step)
precisions, recalls, f1s = [], [], []
for thr in thresholds:
preds = (y_proba >= thr).astype(int)
precisions.append(precision_score(y_true, preds))
recalls.append(recall_score(y_true, preds))
f1s.append(f1_score(y_true, preds))
plt.figure(figsize=(10, 6))
plt.plot(thresholds, precisions, label="Precision", linewidth=2)
plt.plot(thresholds, recalls, label="Recall", linewidth=2)
plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--")
plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5")
plt.title("📊 Performance selon le seuil de probabilité", fontsize=14)
plt.xlabel("Seuil de décision (threshold)")
plt.ylabel("Score")
plt.legend()
plt.grid(True, alpha=0.3)
if save_path:
plt.savefig(save_path, bbox_inches='tight')
print(f"✅ Graphique enregistré : {save_path}")
else:
plt.show()
def run(self):
# ================================
# 1. PREPARE DATA
# ================================
df = pd.read_feather(f"user_data/data/binance/BTC_USDC-{self.timeframe}.feather")
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
# Optionnel : ne garder quune plage temporelle
df = df["2025-01-01":"2025-02-01"]
df = df.reset_index('date')
# Supprimer NaN
df = df.dropna(subset=['open', 'high', 'low', 'close', 'volume'])
# Sassurer que tout est float
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = df[col].astype(float)
print(df.head())
print(df.tail())
# print(df[['rsi', 'atr', 'target']].describe())
self.dataframe = self.calculateIndicators(df)
self.model_indicators = self.listUsableColumns(df) # ['returns','atr','slope','drawdown', 'close']
self.trainModel(df)
self.inspect_model(self.train_model)
crash = Crash()
crash.run()