1474 lines
61 KiB
Python
1474 lines
61 KiB
Python
import inspect
|
||
import os
|
||
|
||
import freqtrade.vendor.qtpylib.indicators as qtpylib
|
||
# Machine Learning
|
||
import joblib
|
||
import matplotlib.pyplot as plt
|
||
import numpy as np
|
||
import optuna
|
||
import pandas as pd
|
||
import seaborn as sns
|
||
import shap
|
||
import ta
|
||
import talib.abstract as talib
|
||
import tensorflow as tf
|
||
from catboost import CatBoostClassifier
|
||
from optuna.visualization import plot_optimization_history
|
||
from optuna.visualization import plot_parallel_coordinate
|
||
from optuna.visualization import plot_param_importances
|
||
from optuna.visualization import plot_slice
|
||
from sklearn.calibration import CalibratedClassifierCV
|
||
from sklearn.feature_selection import SelectFromModel
|
||
from sklearn.feature_selection import VarianceThreshold
|
||
from sklearn.inspection import PartialDependenceDisplay
|
||
from sklearn.inspection import permutation_importance
|
||
from sklearn.metrics import precision_recall_curve, ConfusionMatrixDisplay
|
||
|
||
from sklearn.metrics import (
|
||
classification_report,
|
||
confusion_matrix,
|
||
accuracy_score,
|
||
roc_curve,
|
||
precision_score, recall_score
|
||
)
|
||
from sklearn.metrics import (
|
||
f1_score
|
||
)
|
||
from sklearn.metrics import roc_auc_score
|
||
from sklearn.model_selection import train_test_split
|
||
from sklearn.preprocessing import StandardScaler
|
||
from sklearn.tree import export_text
|
||
from tensorflow.keras.callbacks import EarlyStopping
|
||
from tensorflow.keras.layers import Dense, Dropout
|
||
from tensorflow.keras.models import Sequential
|
||
from xgboost import XGBClassifier
|
||
|
||
|
||
class Crash:
|
||
timeframe = '1h'
|
||
dataframe = {}
|
||
train_model = None
|
||
short_pair = "BTC"
|
||
model_indicators = []
|
||
path = f"user_data/strategies/plots/{short_pair}/crash/"
|
||
|
||
def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'):
|
||
dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean()
|
||
dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", ema_period=window)
|
||
return dataframe
|
||
|
||
def calculeDerivees(
|
||
self,
|
||
dataframe: pd.DataFrame,
|
||
name: str,
|
||
suffixe: str = '',
|
||
window: int = 100,
|
||
coef: float = 0.15,
|
||
ema_period: int = 10,
|
||
verbose: bool = True,
|
||
) -> pd.DataFrame:
|
||
"""
|
||
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
|
||
avec epsilon adaptatif basé sur rolling percentiles.
|
||
"""
|
||
|
||
d1_col = f"{name}{suffixe}_deriv1"
|
||
d2_col = f"{name}{suffixe}_deriv2"
|
||
# d1s_col = f"{name}{suffixe}_deriv1_smooth"
|
||
# d2s_col = f"{name}{suffixe}_deriv2_smooth"
|
||
tendency_col = f"{name}{suffixe}_state"
|
||
|
||
factor1 = 100 * (ema_period / 5)
|
||
factor2 = 10 * (ema_period / 5)
|
||
|
||
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[
|
||
f"{name}{suffixe}"].shift(1)) \
|
||
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
|
||
# --- Distance à la moyenne mobile ---
|
||
dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[
|
||
f"{name}{suffixe}"]
|
||
|
||
# dérivée relative simple
|
||
dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
|
||
# lissage EMA
|
||
dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean()
|
||
|
||
# dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median()
|
||
|
||
dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1)
|
||
dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean()
|
||
|
||
# epsilon adaptatif via rolling percentile
|
||
p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05)
|
||
p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95)
|
||
p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05)
|
||
p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95)
|
||
|
||
eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef
|
||
eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef
|
||
|
||
# fallback global eps
|
||
global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef
|
||
global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef
|
||
|
||
eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1)
|
||
eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2)
|
||
|
||
# if verbose and self.dp.runmode.value in ('backtest'):
|
||
# stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T
|
||
# stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0)
|
||
# print(f"---- Derivatives stats {timeframe}----")
|
||
# print(stats)
|
||
# print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}")
|
||
# print("---------------------------")
|
||
|
||
# mapping tendency
|
||
def tag_by_derivatives(row):
|
||
idx = int(row.name)
|
||
d1v = float(row[d1_col])
|
||
d2v = float(row[d2_col])
|
||
eps1 = float(eps_d1_series.iloc[idx])
|
||
eps2 = float(eps_d2_series.iloc[idx])
|
||
|
||
# # mapping état → codes 3 lettres explicites
|
||
# # | Ancien état | Nouveau code 3 lettres | Interprétation |
|
||
# # | ----------- | ---------------------- | --------------------- |
|
||
# # | 4 | HAU | Hausse Accélérée |
|
||
# # | 3 | HSR | Hausse Ralentissement |
|
||
# # | 2 | HST | Hausse Stable |
|
||
# # | 1 | DHB | Départ Hausse |
|
||
# # | 0 | PAL | Palier / neutre |
|
||
# # | -1 | DBD | Départ Baisse |
|
||
# # | -2 | BSR | Baisse Ralentissement |
|
||
# # | -3 | BST | Baisse Stable |
|
||
# # | -4 | BAS | Baisse Accélérée |
|
||
|
||
# Palier strict
|
||
if abs(d1v) <= eps1 and abs(d2v) <= eps2:
|
||
return 0
|
||
# Départ si d1 ~ 0 mais d2 signale direction
|
||
if abs(d1v) <= eps1:
|
||
return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0
|
||
# Hausse
|
||
if d1v > eps1:
|
||
return 4 if d2v > eps2 else 3
|
||
# Baisse
|
||
if d1v < -eps1:
|
||
return -4 if d2v < -eps2 else -2
|
||
return 0
|
||
|
||
dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1)
|
||
|
||
# if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'):
|
||
# print("##################")
|
||
# print(f"# STAT {timeframe} {name}{suffixe}")
|
||
# print("##################")
|
||
# self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2")
|
||
|
||
return dataframe
|
||
|
||
def calculateIndicators(self, df):
|
||
|
||
# heikinashi = qtpylib.heikinashi(df)
|
||
# df['haopen'] = heikinashi['open']
|
||
# df['haclose'] = heikinashi['close']
|
||
# df['hapercent'] = (df['haclose'] - df['haopen']) / df['haclose']
|
||
|
||
df['mid'] = df['open'] + (df['close'] - df['open']) / 2
|
||
df['sma5'] = df['mid'].ewm(span=5, adjust=False).mean() # df["mid"].rolling(window=5).mean()
|
||
df['sma5_deriv1'] = 1000 * (df['sma5'] - df['sma5'].shift(1)) / df['sma5'].shift(1)
|
||
|
||
df['sma12'] = df['mid'].ewm(span=12, adjust=False).mean()
|
||
df['sma12_deriv1'] = 1000 * (df['sma12'] - df['sma12'].shift(1)) / df[
|
||
'sma12'].shift(1)
|
||
|
||
df['sma24'] = df['mid'].ewm(span=24, adjust=False).mean()
|
||
df['sma24_deriv1'] = 1000 * (df['sma24'] - df['sma24'].shift(1)) / df['sma24'].shift(1)
|
||
|
||
df['sma60'] = df['mid'].ewm(span=60, adjust=False).mean()
|
||
df['sma60_deriv1'] = 1000 * (df['sma60'] - df['sma60'].shift(1)) / df['sma60'].shift(1)
|
||
|
||
# df[f"sma5_inv"] = (df[f"sma5"].shift(2) >= df[f"sma5"].shift(1)) \
|
||
# & (df[f"sma5"].shift(1) <= df[f"sma5"])
|
||
|
||
df["sma5_sqrt"] = (
|
||
np.sqrt(np.abs(df["sma5"] - df["sma5"].shift(1)))
|
||
+ np.sqrt(np.abs(df["sma5"].shift(3) - df["sma5"].shift(1)))
|
||
)
|
||
df["sma5_inv"] = (
|
||
(df["sma5"].shift(2) >= df["sma5"].shift(1))
|
||
& (df["sma5"].shift(1) <= df["sma5"])
|
||
& (df["sma5_sqrt"] > 5)
|
||
)
|
||
|
||
df["sma12_sqrt"] = (
|
||
np.sqrt(np.abs(df["sma12"] - df["sma12"].shift(1)))
|
||
+ np.sqrt(np.abs(df["sma12"].shift(3) - df["sma12"].shift(1)))
|
||
)
|
||
df["sma12_inv"] = (
|
||
(df["sma12"].shift(2) >= df["sma12"].shift(1))
|
||
& (df["sma12"].shift(1) <= df["sma12"])
|
||
& (df["sma12_sqrt"] > 5)
|
||
)
|
||
|
||
df["percent"] = df['mid'].pct_change()
|
||
df["percent3"] = df['mid'].pct_change(3).rolling(3).mean()
|
||
df["percent12"] = df['mid'].pct_change(12).rolling(12).mean()
|
||
df["percent24"] = df['mid'].pct_change(24).rolling(24).mean()
|
||
|
||
df['rsi'] = talib.RSI(df['mid'], timeperiod=14)
|
||
self.calculeDerivees(df, 'rsi', ema_period=12)
|
||
df['max_rsi_12'] = talib.MAX(df['rsi'], timeperiod=12)
|
||
df['max_rsi_24'] = talib.MAX(df['rsi'], timeperiod=24)
|
||
df['max5'] = talib.MAX(df['mid'], timeperiod=5)
|
||
df['min180'] = talib.MIN(df['mid'], timeperiod=180)
|
||
df['max180'] = talib.MAX(df['mid'], timeperiod=180)
|
||
df['pct180'] = ((df["mid"] - df['min180']) / (df['max180'] - df['min180']))
|
||
# df = self.rsi_trend_probability(df, short=60, long=360)
|
||
|
||
###########################################################
|
||
# Bollinger Bands
|
||
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(df), window=20, stds=2)
|
||
df['bb_lowerband'] = bollinger['lower']
|
||
df['bb_middleband'] = bollinger['mid']
|
||
df['bb_upperband'] = bollinger['upper']
|
||
df["bb_percent"] = (
|
||
(df["close"] - df["bb_lowerband"]) /
|
||
(df["bb_upperband"] - df["bb_lowerband"])
|
||
)
|
||
df["bb_width"] = (df["bb_upperband"] - df["bb_lowerband"]) / df["sma24"]
|
||
|
||
# Calcul MACD
|
||
macd, macdsignal, macdhist = talib.MACD(
|
||
df['close'],
|
||
fastperiod=12,
|
||
slowperiod=26,
|
||
signalperiod=9
|
||
)
|
||
|
||
# | Nom | Formule / définition | Signification |
|
||
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
|
||
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal d’achat <br> - Croisement du MACD en dessous → signal de vente |
|
||
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance. <br> - Positif et croissant → tendance haussière qui s’accélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui s’accélère <br> - Négatif mais croissant → ralentissement de la baisse |
|
||
|
||
# Ajouter dans le df
|
||
df['macd'] = macd
|
||
df['macdsignal'] = macdsignal
|
||
df['macdhist'] = macdhist
|
||
|
||
# ------------------------------------------------------------------------------------
|
||
# rolling SMA indicators (used for trend detection too)
|
||
# s_short = self.DEFAULT_PARAMS['sma_short']
|
||
# s_long = self.DEFAULT_PARAMS['sma_long']
|
||
#
|
||
# df[f'sma_{s_short}'] = df['close'].rolling(window=s_short).mean()
|
||
# df[f'sma_{s_long}'] = df['close'].rolling(window=s_long).mean()
|
||
|
||
# --- pente brute ---
|
||
df['slope'] = df['sma24'].diff()
|
||
|
||
# --- lissage EMA ---
|
||
df['slope_smooth'] = df['slope'].ewm(span=10, adjust=False).mean()
|
||
|
||
###########################
|
||
# df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume
|
||
# Assure-toi qu'il est trié par date croissante
|
||
timeframe = self.timeframe
|
||
|
||
# --- Volatilité normalisée ---
|
||
df['atr'] = ta.volatility.AverageTrueRange(
|
||
high=df['high'], low=df['low'], close=df['close'], window=14
|
||
).average_true_range()
|
||
df['atr_norm'] = df['atr'] / df['close']
|
||
|
||
# --- Force de tendance ---
|
||
df['adx'] = ta.trend.ADXIndicator(
|
||
high=df['high'], low=df['low'], close=df['close'], window=14
|
||
).adx()
|
||
|
||
# --- Volume directionnel (On Balance Volume) ---
|
||
df['obv'] = ta.volume.OnBalanceVolumeIndicator(
|
||
close=df['close'], volume=df['volume']
|
||
).on_balance_volume()
|
||
self.calculeDerivees(df, 'obv', ema_period=1)
|
||
|
||
df['obv12'] = ta.volume.OnBalanceVolumeIndicator(
|
||
close=df['sma12'], volume=df['volume'].rolling(12).sum()
|
||
).on_balance_volume()
|
||
|
||
df['obv24'] = ta.volume.OnBalanceVolumeIndicator(
|
||
close=df['sma24'], volume=df['volume'].rolling(24).sum()
|
||
).on_balance_volume()
|
||
|
||
# self.calculeDerivees(df, 'obv5', ema_period=5)
|
||
|
||
# --- Volatilité récente (écart-type des rendements) ---
|
||
df['vol_24'] = df['percent'].rolling(24).std()
|
||
|
||
# Compter les baisses / hausses consécutives
|
||
# self.calculateDownAndUp(df, limit=0.0001)
|
||
|
||
# df : ton df OHLCV + indicateurs existants
|
||
# Assurez-vous que les colonnes suivantes existent :
|
||
# 'max_rsi_12', 'roc_24', 'bb_percent_1h'
|
||
|
||
# --- Filtrage des NaN initiaux ---
|
||
# df = df.dropna()
|
||
|
||
df['rsi_slope'] = df['rsi'].diff(3) / 3 # vitesse moyenne du RSI
|
||
df['adx_change'] = df['adx'] - df['adx'].shift(12) # évolution de la tendance
|
||
df['volatility_ratio'] = df['atr_norm'] / df['bb_width']
|
||
|
||
df["rsi_diff"] = df["rsi"] - df["rsi"].shift(3)
|
||
df["slope_ratio"] = df["sma5_deriv1"] / (df["sma60_deriv1"] + 1e-9)
|
||
df["divergence"] = (df["rsi_deriv1"] * df["sma5_deriv1"]) < 0
|
||
|
||
# features
|
||
df['returns'] = df['close'].pct_change()
|
||
df['atr'] = (df['high'] - df['low']).rolling(14).mean()
|
||
df['slope'] = df['close'].rolling(20).mean().diff()
|
||
|
||
df['drawdown'] = (df['close'] - df['close'].rolling(48).max()) / df['close'].rolling(48).max()
|
||
|
||
df['atr_pct'] = df['atr'] / df['close']
|
||
df['vol_z'] = (df['volume'] - df['volume'].rolling(48).mean()) / df['volume'].rolling(48).std()
|
||
df['rsi_slope'] = df['rsi'].diff(3)
|
||
|
||
# VOLATILITÉ
|
||
df['atr_norm'] = df['atr'] / df['close']
|
||
|
||
# DRAWDOWN (critique)
|
||
df['rolling_max'] = df['close'].rolling(48).max()
|
||
df['drawdown'] = (df['close'] - df['rolling_max']) / df['rolling_max']
|
||
df['dd_score'] = np.clip(-df['drawdown'] / 0.10, 0, 1)
|
||
|
||
# TENDANCE (slope)
|
||
df['MA7'] = df['close'].rolling(7).mean()
|
||
df['MA14'] = df['close'].rolling(14).mean()
|
||
df['slope'] = df['MA7'] - df['MA14']
|
||
df['slope_score'] = np.clip(1 - (df['slope'] / df['close']), 0, 1)
|
||
|
||
# NÉGATIVE STREAK
|
||
df['neg_streak'] = df['close'].pct_change().apply(lambda x: min(x, 0)).rolling(24).sum()
|
||
df['neg_score'] = np.clip(-df['neg_streak'] / 0.05, 0, 1)
|
||
|
||
# COMPOSANTS COURT TERME
|
||
df['pct_change_3'] = df['close'].pct_change(3)
|
||
df['pct_change_3_smooth'] = df['pct_change_3'].rolling(6).mean()
|
||
df['crash_score'] = np.clip(1 + (df['pct_change_3_smooth'] / 0.05), 0, 1)
|
||
|
||
df['speed'] = df['close'].diff().rolling(6).mean()
|
||
df['accel'] = df['speed'].diff().rolling(6).mean()
|
||
df['STD20'] = df['close'].rolling(20).std()
|
||
df['accel_score'] = np.clip(1 + (df['accel'] / (df['STD20'] + 1e-9)), 0, 1)
|
||
|
||
# INDEX FINAL
|
||
df['crash_raw'] = (
|
||
0.35 * df['dd_score'] + # le plus important pour crash lent
|
||
0.25 * df['neg_score'] +
|
||
0.20 * df['slope_score'] +
|
||
0.10 * df['crash_score'] +
|
||
0.10 * df['accel_score']
|
||
)
|
||
|
||
# LISSAGE SIMPLE
|
||
df['crash_risk_index'] = df['crash_raw'].ewm(span=24).mean()
|
||
|
||
return df
|
||
|
||
def feature_auc_scores(self, X, y):
|
||
aucs = {}
|
||
for col in X.columns:
|
||
try:
|
||
aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0))
|
||
except Exception:
|
||
aucs[col] = np.nan
|
||
return pd.Series(aucs).sort_values(ascending=False)
|
||
|
||
def listUsableColumns(self, dataframe):
|
||
# Étape 1 : sélectionner numériques
|
||
numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
|
||
# Étape 2 : enlever constantes
|
||
usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
|
||
and not c.endswith("_state")
|
||
and not c.endswith("_1d")
|
||
# and not c.endswith("_1h")
|
||
# and not c.startswith("open") and not c.startswith("close")
|
||
# and not c.startswith("low") and not c.startswith("high")
|
||
# and not c.startswith("haopen") and not c.startswith("haclose")
|
||
# and not c.startswith("bb_lower") and not c.startswith("bb_upper")
|
||
# and not c.startswith("bb_middle")
|
||
and not c.endswith("_count")
|
||
and not c.endswith("_class") and not c.endswith("_price")
|
||
and not c.startswith('stop_buying')
|
||
and not c.startswith('target')
|
||
and not c.startswith('lvl')
|
||
and not c.startswith('confidence_index')
|
||
]
|
||
# Étape 3 : remplacer inf et NaN par 0
|
||
dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
|
||
print("Colonnes utilisables pour le modèle :")
|
||
print(usable_cols)
|
||
# self.model_indicators = usable_cols
|
||
return usable_cols
|
||
|
||
def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7):
|
||
"""
|
||
Sélectionne les features les plus corrélées avec target,
|
||
tout en supprimant celles trop corrélées entre elles.
|
||
"""
|
||
# 1️⃣ Calcul des corrélations absolues avec la cible
|
||
corr = df.corr(numeric_only=True)
|
||
corr_target = corr[target].abs().sort_values(ascending=False)
|
||
|
||
# 2️⃣ Prend les N features les plus corrélées avec la cible (hors target)
|
||
features = corr_target.drop(target).head(top_n).index.tolist()
|
||
|
||
# 3️⃣ Évite les features trop corrélées entre elles
|
||
selected = []
|
||
for feat in features:
|
||
too_correlated = False
|
||
for sel in selected:
|
||
if abs(corr.loc[feat, sel]) > corr_threshold:
|
||
too_correlated = True
|
||
break
|
||
if not too_correlated:
|
||
selected.append(feat)
|
||
|
||
# 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation
|
||
selected_corr = pd.DataFrame({
|
||
"feature": selected,
|
||
"corr_with_target": [corr.loc[f, target] for f in selected]
|
||
}).sort_values(by="corr_with_target", key=np.abs, ascending=False)
|
||
|
||
return selected_corr
|
||
|
||
def drawPredictions(self, df, indicators, y_proba, threshold=0.5):
|
||
"""
|
||
Trace simultanément plusieurs indicateurs et la prédiction du modèle.
|
||
|
||
Parameters
|
||
----------
|
||
df : pd.DataFrame
|
||
Dataframe avec les colonnes des indicateurs et la target.
|
||
indicators : list[str]
|
||
Liste des colonnes du dataframe à tracer.
|
||
y_proba : np.array
|
||
Probabilités prédites par le modèle (valeurs continues entre 0 et 1)
|
||
threshold : float
|
||
Seuil pour convertir probabilité en signal binaire
|
||
output_file : str
|
||
Fichier sur lequel sauvegarder le graphique
|
||
"""
|
||
plt.figure(figsize=(18, 6))
|
||
|
||
# Tracer les indicateurs
|
||
for col in indicators:
|
||
plt.plot(df.index, df[col], label=col, alpha=0.7)
|
||
|
||
# Tracer la prédiction du modèle (probabilité)
|
||
plt.plot(df.index, y_proba, label="Prediction prob.", color="black", linestyle="--")
|
||
|
||
# Optionnel : signal binaire (1 si prob > threshold)
|
||
y_signal = (y_proba > threshold).astype(int)
|
||
plt.scatter(df.index, y_signal, color='red', marker='o', label='Signal > threshold', s=20)
|
||
|
||
plt.title("Indicateurs + prédiction MLP")
|
||
plt.xlabel("Date")
|
||
plt.ylabel("Valeur / Probabilité")
|
||
plt.legend()
|
||
plt.grid(True)
|
||
plt.tight_layout()
|
||
plt.savefig(f"{self.path}/indicators_vs_prediction.png")
|
||
plt.close()
|
||
|
||
|
||
def drawSequentialGraphs(self, model, history, X_train_scaled, X_valid_scaled, y_train,
|
||
y_valid, thresholds=None, best_threshold=None
|
||
):
|
||
"""
|
||
Génère et sauvegarde tous les graphes utiles pour un MLP (Sequential).
|
||
|
||
Parameters
|
||
----------
|
||
model : keras Sequential
|
||
history : History (retour de model.fit)
|
||
X_train_scaled, X_valid_scaled : np.array
|
||
y_train, y_valid : np.array
|
||
thresholds : list[float] | None
|
||
best_threshold : float | None
|
||
"""
|
||
feature_names = self.listUsableColumns(self.dataframe)
|
||
|
||
# =========================
|
||
# 1️⃣ Courbes de loss
|
||
# =========================
|
||
plt.figure()
|
||
plt.plot(history.history['loss'], label='train')
|
||
plt.plot(history.history['val_loss'], label='val')
|
||
plt.legend()
|
||
plt.title("MLP loss")
|
||
plt.savefig(f"{self.path}/loss.png")
|
||
plt.close()
|
||
|
||
# =========================
|
||
# 2️⃣ Probabilités
|
||
# =========================
|
||
y_proba = model.predict(X_valid_scaled).ravel()
|
||
|
||
plt.figure()
|
||
plt.hist(y_proba[y_valid == 0], bins=50, alpha=0.6, label="No crash")
|
||
plt.hist(y_proba[y_valid == 1], bins=50, alpha=0.6, label="Crash")
|
||
plt.legend()
|
||
plt.title("Predicted probability distribution")
|
||
plt.savefig(f"{self.path}/proba_distribution.png")
|
||
plt.close()
|
||
|
||
# =========================
|
||
# 3️⃣ Precision–Recall
|
||
# =========================
|
||
precision, recall, _ = precision_recall_curve(y_valid, y_proba)
|
||
|
||
plt.figure()
|
||
plt.plot(recall, precision)
|
||
plt.xlabel("Recall")
|
||
plt.ylabel("Precision")
|
||
plt.title("Precision-Recall curve")
|
||
plt.savefig(f"{self.path}/precision_recall.png")
|
||
plt.close()
|
||
|
||
# =========================
|
||
# 4️⃣ F1 vs threshold
|
||
# =========================
|
||
if thresholds is None:
|
||
thresholds = np.linspace(0.05, 0.95, 60)
|
||
|
||
f1s = [
|
||
f1_score(y_valid, (y_proba > t).astype(int))
|
||
for t in thresholds
|
||
]
|
||
|
||
plt.figure()
|
||
plt.plot(thresholds, f1s)
|
||
plt.xlabel("Threshold")
|
||
plt.ylabel("F1 score")
|
||
plt.title("F1 vs threshold")
|
||
plt.savefig(f"{self.path}/f1_threshold.png")
|
||
plt.close()
|
||
|
||
# Choix auto du seuil si absent
|
||
if best_threshold is None:
|
||
best_threshold = thresholds[int(np.argmax(f1s))]
|
||
|
||
# =========================
|
||
# 5️⃣ Matrice de confusion
|
||
# =========================
|
||
ConfusionMatrixDisplay.from_predictions(
|
||
y_valid,
|
||
(y_proba > best_threshold).astype(int)
|
||
)
|
||
plt.title(f"Confusion matrix (threshold={best_threshold:.2f})")
|
||
plt.savefig(f"{self.path}/confusion_matrix.png")
|
||
plt.close()
|
||
|
||
# # =========================
|
||
# # 6️⃣ Permutation importance
|
||
# # =========================
|
||
# r = permutation_importance(
|
||
# model,
|
||
# X_valid_scaled,
|
||
# y_valid,
|
||
# scoring="f1",
|
||
# n_repeats=8,
|
||
# n_jobs=-1
|
||
# )
|
||
#
|
||
# importances = pd.Series(
|
||
# r.importances_mean,
|
||
# index=feature_names
|
||
# ).sort_values()
|
||
#
|
||
# plt.figure(figsize=(7, 4))
|
||
# importances.plot(kind="barh")
|
||
# plt.title("Permutation importance (MLP)")
|
||
# plt.savefig(f"{self.path}/permutation_importance.png")
|
||
# plt.close()
|
||
|
||
# Exemple : on choisit 3 indicateurs du dataframe
|
||
indicators = ['percent12']
|
||
|
||
# y_proba = model.predict(X_valid_scaled).ravel()
|
||
self.drawPredictions(
|
||
df=self.dataframe.iloc[-len(y_proba):], # sélectionner la période correspondant à X_valid
|
||
indicators=indicators,
|
||
y_proba=y_proba,
|
||
threshold=0.5
|
||
)
|
||
|
||
# =========================
|
||
# 7️⃣ Sauvegarde seuil
|
||
# =========================
|
||
with open(f"{self.path}/best_threshold.txt", "w") as f:
|
||
f.write(str(best_threshold))
|
||
|
||
return {
|
||
"best_threshold": best_threshold,
|
||
"best_f1": max(f1s)
|
||
}
|
||
|
||
def optimize_sequential(self, X_train, X_valid, y_train, y_valid , n_trials=20):
|
||
def objective(trial):
|
||
tf.keras.backend.clear_session()
|
||
|
||
# 🔧 Hyperparams
|
||
n1 = trial.suggest_int("units_1", 32, 128)
|
||
n2 = trial.suggest_int("units_2", 16, 64)
|
||
dropout1 = trial.suggest_float("dropout_1", 0.1, 0.5)
|
||
dropout2 = trial.suggest_float("dropout_2", 0.1, 0.5)
|
||
lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
|
||
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])
|
||
|
||
# 🔒 Scaling (train only)
|
||
scaler = StandardScaler()
|
||
X_tr = scaler.fit_transform(X_train)
|
||
X_val = scaler.transform(X_valid)
|
||
|
||
# 🧠 Model
|
||
model = Sequential([
|
||
Dense(n1, activation='relu', input_shape=(X_tr.shape[1],)),
|
||
Dropout(dropout1),
|
||
Dense(n2, activation='relu'),
|
||
Dropout(dropout2),
|
||
Dense(1, activation='sigmoid')
|
||
])
|
||
|
||
model.compile(
|
||
optimizer=tf.keras.optimizers.Adam(lr),
|
||
loss='binary_crossentropy'
|
||
)
|
||
|
||
es = EarlyStopping(
|
||
monitor='val_loss',
|
||
patience=10,
|
||
restore_best_weights=True
|
||
)
|
||
|
||
model.fit(
|
||
X_tr, y_train,
|
||
validation_data=(X_val, y_valid),
|
||
epochs=150,
|
||
batch_size=batch_size,
|
||
callbacks=[es],
|
||
verbose=0
|
||
)
|
||
|
||
proba = model.predict(X_val).ravel()
|
||
|
||
# 🔥 Optimisation sur le F1 crash
|
||
thresholds = [0.3, 0.4, 0.5, 0.6]
|
||
best_f1 = max(
|
||
f1_score(y_valid, (proba > t).astype(int))
|
||
for t in thresholds
|
||
)
|
||
|
||
return best_f1
|
||
|
||
study = optuna.create_study(direction="maximize")
|
||
study.optimize(objective, n_trials=n_trials)
|
||
|
||
print(study.best_trial)
|
||
|
||
best = study.best_trial.params
|
||
|
||
scaler = StandardScaler()
|
||
X_train_scaled = scaler.fit_transform(X_train)
|
||
X_valid_scaled = scaler.transform(X_valid)
|
||
|
||
model = Sequential([
|
||
Dense(best["units_1"], activation='relu', input_shape=(X_train_scaled.shape[1],)),
|
||
Dropout(best["dropout_1"]),
|
||
Dense(best["units_2"], activation='relu'),
|
||
Dropout(best["dropout_2"]),
|
||
Dense(1, activation='sigmoid')
|
||
])
|
||
|
||
model.compile(
|
||
optimizer=tf.keras.optimizers.Adam(best["lr"]),
|
||
loss='binary_crossentropy'
|
||
)
|
||
|
||
history = model.fit(
|
||
X_train_scaled, y_train,
|
||
validation_data=(X_valid_scaled, y_valid),
|
||
epochs=150,
|
||
batch_size=best["batch_size"],
|
||
callbacks=[EarlyStopping(patience=10, restore_best_weights=True)],
|
||
verbose=0
|
||
)
|
||
|
||
result = self.drawSequentialGraphs(
|
||
model=model,
|
||
history=history,
|
||
X_train_scaled=X_train_scaled,
|
||
X_valid_scaled=X_valid_scaled,
|
||
y_train=y_train,
|
||
y_valid=y_valid
|
||
)
|
||
|
||
return model, study
|
||
|
||
def optimize_xgbclassifier(self, X_train, y_train, X_valid, y_valid, n_trials=20):
|
||
def objective(trial):
|
||
# local_model = XGBClassifier(
|
||
# n_estimators=300, # nombre d'arbres plus raisonnable
|
||
# learning_rate=0.01, # un peu plus rapide que 0.006, mais stable
|
||
# max_depth=4, # capture plus de patterns que 3, sans overfitting excessif
|
||
# subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting
|
||
# colsample_bytree=0.8, # 80% des features par arbre
|
||
# gamma=0.01, # gain minimal pour un split → régularisation
|
||
# reg_alpha=0.01, # L1 régularisation des feuilles
|
||
# reg_lambda=1, # L2 régularisation des feuilles
|
||
# n_jobs=-1, # utilise tous les cœurs CPU pour accélérer
|
||
# random_state=42, # reproductibilité
|
||
# missing=float('nan'), # valeur manquante reconnue
|
||
# eval_metric='logloss' # métrique pour classification binaire
|
||
# )
|
||
|
||
local_model = XGBClassifier(
|
||
n_estimators=trial.suggest_int("n_estimators", 300, 500),
|
||
max_depth=trial.suggest_int("max_depth", 1, 6),
|
||
learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True),
|
||
subsample=trial.suggest_float("subsample", 0.6, 1.0),
|
||
colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0),
|
||
scale_pos_weight=1,
|
||
objective="binary:logistic",
|
||
eval_metric="logloss",
|
||
n_jobs=-1
|
||
)
|
||
|
||
local_model.fit(
|
||
X_train,
|
||
y_train,
|
||
eval_set=[(X_valid, y_valid)],
|
||
# early_stopping_rounds=50,
|
||
verbose=False
|
||
)
|
||
|
||
proba = local_model.predict_proba(X_valid)[:, 1]
|
||
thresholds = np.linspace(0.1, 0.9, 50)
|
||
best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds)
|
||
|
||
return best_f1
|
||
|
||
study = optuna.create_study(direction="maximize")
|
||
study.optimize(objective, n_trials=100)
|
||
|
||
# SHAP
|
||
# Reconstruction du modèle final avec les meilleurs hyperparamètres
|
||
# Récupération des meilleurs paramètres trouvés
|
||
best_params = study.best_params
|
||
|
||
best_model = XGBClassifier(**best_params)
|
||
best_model.fit(X_train, y_train)
|
||
|
||
self.analyseShap(X_train, X_valid)
|
||
|
||
selected_features = self.calibrateModel(best_model, X_train, y_train)
|
||
|
||
self.analyseImportances(selected_features, X_train, X_valid, y_valid)
|
||
|
||
return best_model, study
|
||
|
||
def optimize_catboost(self, X_train, y_train, X_valid, y_valid, n_trials=20):
|
||
"""
|
||
Optimise un modèle CatBoost pour la détection de crashs rares.
|
||
"""
|
||
|
||
# Calcul automatique du poids pour la classe minoritaire
|
||
scale_pos_weight = len(y_train[y_train == 0]) / max(len(y_train[y_train == 1]), 1)
|
||
|
||
def objective(trial):
|
||
model = CatBoostClassifier(
|
||
iterations=trial.suggest_int("iterations", 200, 500),
|
||
depth=trial.suggest_int("depth", 3, 8),
|
||
learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
|
||
l2_leaf_reg=trial.suggest_float("l2_leaf_reg", 1, 10),
|
||
subsample=trial.suggest_float("subsample", 0.6, 1.0),
|
||
scale_pos_weight=scale_pos_weight,
|
||
eval_metric="F1",
|
||
random_state=42,
|
||
verbose=0
|
||
)
|
||
|
||
# Entraînement
|
||
model.fit(X_train, y_train, eval_set=(X_valid, y_valid), early_stopping_rounds=50)
|
||
|
||
# Probabilités pour la classe 1 (crash)
|
||
proba = model.predict_proba(X_valid)[:, 1]
|
||
|
||
# Recherche du seuil optimal pour maximiser F1
|
||
thresholds = np.linspace(0.05, 0.5, 20) # seuil plus bas pour classe rare
|
||
best_f1 = 0
|
||
for t in thresholds:
|
||
f1 = f1_score(y_valid, (proba > t).astype(int))
|
||
if f1 > best_f1:
|
||
best_f1 = f1
|
||
|
||
return best_f1
|
||
|
||
study = optuna.create_study(direction="maximize")
|
||
study.optimize(objective, n_trials=n_trials)
|
||
|
||
print("Meilleurs paramètres :", study.best_params)
|
||
print("Meilleur F1 :", study.best_value)
|
||
|
||
# Entraînement final avec les meilleurs paramètres
|
||
best_params = study.best_params
|
||
best_model = CatBoostClassifier(
|
||
iterations=best_params["iterations"],
|
||
depth=best_params["depth"],
|
||
learning_rate=best_params["learning_rate"],
|
||
l2_leaf_reg=best_params["l2_leaf_reg"],
|
||
subsample=best_params["subsample"],
|
||
scale_pos_weight=scale_pos_weight,
|
||
eval_metric="F1",
|
||
random_state=42,
|
||
verbose=0
|
||
)
|
||
|
||
best_model.fit(X_train, y_train)
|
||
|
||
self.analyseShap(X_train, X_valid)
|
||
|
||
selected_features = self.calibrateModel(best_model, X_train, y_train)
|
||
self.analyseImportances(selected_features, X_train, X_valid, y_valid)
|
||
|
||
return best_model, study
|
||
|
||
def trainModel(self, dataframe):
|
||
pair = self.short_pair
|
||
pd.set_option('display.max_rows', None)
|
||
pd.set_option('display.max_columns', None)
|
||
pd.set_option("display.width", 200)
|
||
path = self.path # f"user_data/plots/{pair}/"
|
||
os.makedirs(path, exist_ok=True)
|
||
os.system(f"rm -rf {self.path}/*")
|
||
|
||
# # Étape 1 : sélectionner numériques
|
||
# numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
|
||
#
|
||
# # Étape 2 : enlever constantes
|
||
# usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
|
||
# and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d")
|
||
# and not c.endswith("_class") and not c.endswith("_price")
|
||
# and not c.startswith('stop_buying'))]
|
||
#
|
||
# # Étape 3 : remplacer inf et NaN par 0
|
||
# dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
|
||
#
|
||
# print("Colonnes utilisables pour le modèle :")
|
||
# print(usable_cols)
|
||
#
|
||
# self.model_indicators = usable_cols
|
||
#
|
||
df = dataframe[self.model_indicators].copy()
|
||
|
||
# Corrélations des colonnes
|
||
corr = df.corr(numeric_only=True)
|
||
print("Corrélation des colonnes")
|
||
print(corr)
|
||
|
||
# 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
|
||
# df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int)
|
||
# df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 100).astype(int)
|
||
# df['target'] = df['target'].fillna(0).astype(int)
|
||
|
||
# label : crash si -n% dans les p heures
|
||
self.initTarget(df)
|
||
|
||
self.calculateCorrelation(df)
|
||
|
||
# # Exemple d'utilisation :
|
||
# selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7)
|
||
# print("===== 🎯 FEATURES SÉLECTIONNÉES =====")
|
||
# print(selected_corr)
|
||
#
|
||
# # Nettoyage
|
||
# df = df.dropna()
|
||
#
|
||
# X = df[self.model_indicators]
|
||
# y = df['target'] # ta colonne cible binaire ou numérique
|
||
# print("===== 🎯 FEATURES SCORES =====")
|
||
# print(self.feature_auc_scores(X, y))
|
||
|
||
# 4️⃣ Split train/test
|
||
X = df[self.model_indicators]
|
||
y = df['target']
|
||
|
||
# Séparation temporelle (train = 80 %, valid = 20 %)
|
||
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False)
|
||
|
||
# Nettoyage des valeurs invalides
|
||
|
||
selector = VarianceThreshold(threshold=0.0001)
|
||
selector.fit(X_train)
|
||
selected = X_train.columns[selector.get_support()]
|
||
print("Colonnes conservées :", list(selected))
|
||
|
||
# 5️⃣ Entraînement du modèle
|
||
# self.train_model = RandomForestClassifier(n_estimators=200, random_state=42)
|
||
|
||
assert len(X_train) == len(y_train)
|
||
assert len(X_valid) == len(y_valid)
|
||
|
||
self.train_model, study = self.optimize_sequential(X_train, X_valid, y_train, y_valid, n_trials=50)
|
||
|
||
self.analyseStudy(study)
|
||
|
||
y_pred = self.train_model.predict(X_valid)
|
||
|
||
if hasattr(self.train_model, "predict_proba"):
|
||
y_proba = self.train_model.predict_proba(X_valid)[:, 1]
|
||
else:
|
||
y_proba = self.train_model.predict(X_valid).ravel()
|
||
|
||
# print(classification_report(y_valid, y_pred))
|
||
# print(confusion_matrix(y_valid, y_pred))
|
||
print("\nRapport de classification :\n", classification_report(y_valid, y_pred))
|
||
print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred))
|
||
|
||
# # Importances
|
||
# importances = pd.DataFrame({
|
||
# "feature": self.train_model.feature_name_,
|
||
# "importance": self.train_model.feature_importances_
|
||
# }).sort_values("importance", ascending=False)
|
||
# print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
|
||
# print(importances)
|
||
|
||
best_f1 = 0
|
||
best_t = 0.5
|
||
for t in [0.3, 0.4, 0.5, 0.6, 0.7]:
|
||
y_pred_thresh = (y_proba > t).astype(int)
|
||
score = f1_score(y_valid, y_pred_thresh)
|
||
print(f"Seuil {t:.1f} → F1: {score:.3f}")
|
||
if score > best_f1:
|
||
best_f1 = score
|
||
best_t = t
|
||
|
||
print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}")
|
||
|
||
# 6️⃣ Évaluer la précision (facultatif)
|
||
preds = self.train_model.predict(X_valid)
|
||
acc = accuracy_score(y_valid, preds)
|
||
print(f"Accuracy: {acc:.3f}")
|
||
|
||
# 7️⃣ Sauvegarde du modèle
|
||
joblib.dump(self.train_model, f"{self.path}/{pair}_rf_model.pkl")
|
||
print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl")
|
||
|
||
# X = dataframe des features (après shift/rolling/indicators)
|
||
# y = target binaire ou décimale
|
||
# model = ton modèle entraîné (RandomForestClassifier ou Regressor)
|
||
|
||
# # --- 1️⃣ Mutual Information (MI) ---
|
||
# mi_scores = mutual_info_classif(X.fillna(0), y)
|
||
# mi_series = pd.Series(mi_scores, index=X.columns, name='MI')
|
||
#
|
||
# # --- 2️⃣ Permutation Importance (PI) ---
|
||
# pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
|
||
# pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI')
|
||
#
|
||
# # --- 3️⃣ Combinaison dans un seul dataframe ---
|
||
# importance_df = pd.concat([mi_series, pi_series], axis=1)
|
||
# importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle
|
||
# print(importance_df)
|
||
#
|
||
# importance_df.plot(kind='bar', figsize=(10, 5))
|
||
# plt.title("Mutual Info vs Permutation Importance")
|
||
# plt.ylabel("Score")
|
||
# plt.show()
|
||
|
||
self.analyze_model(self.train_model, X_train, X_valid, y_train, y_valid)
|
||
|
||
def analyseImportances(self, selected_features, X_train, X_valid, y_valid):
|
||
# Feature importance
|
||
if hasattr(self.train_model, "feature_importances_"):
|
||
importances = self.train_model.feature_importances_
|
||
feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False)
|
||
|
||
# Affichage
|
||
feat_imp.plot(kind='bar', figsize=(12, 6))
|
||
plt.title("Feature importances")
|
||
# plt.show()
|
||
plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight')
|
||
|
||
result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10,
|
||
random_state=42)
|
||
perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False)
|
||
perm_imp.plot(kind='bar', figsize=(12, 6))
|
||
plt.title("Permutation feature importance")
|
||
# plt.show()
|
||
plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight')
|
||
|
||
fig, ax = plt.subplots(figsize=(24, 48))
|
||
PartialDependenceDisplay.from_estimator(
|
||
self.train_model,
|
||
X_valid,
|
||
selected_features,
|
||
kind="average",
|
||
ax=ax
|
||
)
|
||
fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight")
|
||
plt.close(fig)
|
||
|
||
def calibrateModel(self, model, X_train, y_train):
|
||
# 2️⃣ Sélection des features AVANT calibration
|
||
sfm = SelectFromModel(model, threshold="median", prefit=True)
|
||
selected_features = X_train.columns[sfm.get_support()]
|
||
print(selected_features)
|
||
# 3️⃣ Calibration ensuite (facultative)
|
||
calibrated = CalibratedClassifierCV(model, method='sigmoid', cv=5)
|
||
calibrated.fit(X_train[selected_features], y_train)
|
||
print(calibrated)
|
||
# # # calibration
|
||
# model = CalibratedClassifierCV(model, method='sigmoid', cv=5)
|
||
# # Sélection
|
||
# sfm = SelectFromModel(model, threshold="median")
|
||
# sfm.fit(X_train, y_train)
|
||
# selected_features = X_train.columns[sfm.get_support()]
|
||
# print(selected_features)
|
||
return selected_features
|
||
|
||
def calculateCorrelation(self, df):
|
||
# Corrélations triées par importance avec une colonne cible
|
||
target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False)
|
||
print("Corrélations triées par importance avec une colonne cible")
|
||
print(target_corr)
|
||
# Corrélations triées par importance avec une colonne cible
|
||
corr = df.corr(numeric_only=True)
|
||
corr_unstacked = (
|
||
corr.unstack()
|
||
.reset_index()
|
||
.rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"})
|
||
)
|
||
# Supprimer les doublons col1/col2 inversés et soi-même
|
||
corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]]
|
||
# Trier par valeur absolue de corrélation
|
||
corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index)
|
||
print("Trier par valeur absolue de corrélation")
|
||
print(corr_sorted.head(20))
|
||
# --- Calcul de la corrélation ---
|
||
corr = df.corr(numeric_only=True) # évite les colonnes non numériques
|
||
corr = corr * 100 # passage en pourcentage
|
||
# --- Masque pour n’afficher que le triangle supérieur (optionnel) ---
|
||
mask = np.triu(np.ones_like(corr, dtype=bool))
|
||
# --- Création de la figure ---
|
||
fig, ax = plt.subplots(figsize=(96, 36))
|
||
# --- Heatmap avec un effet “température” ---
|
||
sns.heatmap(
|
||
corr,
|
||
mask=mask,
|
||
cmap="coolwarm", # palette bleu → rouge
|
||
center=0, # 0 au centre
|
||
annot=True, # affiche les valeurs dans chaque case
|
||
fmt=".0f", # format entier (pas de décimale)
|
||
cbar_kws={"label": "Corrélation (%)"}, # légende à droite
|
||
linewidths=0.5, # petites lignes entre les cases
|
||
ax=ax
|
||
)
|
||
# --- Personnalisation ---
|
||
ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20)
|
||
plt.xticks(rotation=45, ha="right")
|
||
plt.yticks(rotation=0)
|
||
# --- Sauvegarde ---
|
||
output_path = f"{self.path}/Matrice_de_correlation_temperature.png"
|
||
plt.savefig(output_path, bbox_inches="tight", dpi=150)
|
||
plt.close(fig)
|
||
print(f"✅ Matrice enregistrée : {output_path}")
|
||
|
||
def analyseStudy(self, study):
|
||
# ---- après avoir exécuté la study ------
|
||
print("Best value (F1):", study.best_value)
|
||
print("Best params:", study.best_params)
|
||
best_trial = study.best_trial
|
||
print("\n=== BEST TRIAL ===")
|
||
print("Number:", best_trial.number)
|
||
print("Value:", best_trial.value)
|
||
print("Params:")
|
||
for k, v in best_trial.params.items():
|
||
print(f" - {k}: {v}")
|
||
# All trials summary
|
||
print("\n=== ALL TRIALS ===")
|
||
for t in study.trials:
|
||
print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}")
|
||
# DataFrame of trials
|
||
df = study.trials_dataframe()
|
||
print(df.head())
|
||
# Graphs
|
||
fig = plot_optimization_history(study)
|
||
fig.write_html(f"{self.path}/optimization_history.html")
|
||
fig = plot_param_importances(study)
|
||
fig.write_html(f"{self.path}/param_importances.html")
|
||
fig = plot_slice(study)
|
||
fig.write_html(f"{self.path}/slice.html")
|
||
fig = plot_parallel_coordinate(study)
|
||
fig.write_html(f"{self.path}/parallel_coordinates.html")
|
||
|
||
def analyseShap(self, X_train, X_valid):
|
||
# === SHAP plots ===
|
||
# Calcul SHAP
|
||
explainer = shap.TreeExplainer(self.train_model)
|
||
shap_values = explainer(X_train)
|
||
# On choisit une observation pour le graphique waterfall
|
||
# Explication du modèle de prédiction pour la première ligne de X_valid.”
|
||
i = 0
|
||
# Extraction des valeurs
|
||
shap_val = shap_values[i].values
|
||
feature_names = X_train.columns
|
||
feature_values = X_train.iloc[i]
|
||
# Tri par importance absolue
|
||
# order = np.argsort(np.abs(shap_val))[::-1]
|
||
k = 10
|
||
order = np.argsort(np.abs(shap_val))[::-1][:k]
|
||
# ---- Création figure sans l'afficher ----
|
||
plt.ioff() # Désactive l'affichage interactif
|
||
shap.plots.waterfall(
|
||
shap.Explanation(
|
||
values=shap_val[order],
|
||
base_values=shap_values.base_values[i],
|
||
data=feature_values.values[order],
|
||
feature_names=feature_names[order]
|
||
),
|
||
show=False # IMPORTANT : n'affiche pas dans Jupyter / console
|
||
)
|
||
# Sauvegarde du graphique sur disque
|
||
output_path = f"{self.path}/shap_waterfall.png"
|
||
plt.savefig(output_path, dpi=200, bbox_inches='tight')
|
||
plt.close() # ferme la figure proprement
|
||
print(f"Graphique SHAP enregistré : {output_path}")
|
||
|
||
# Résumé global
|
||
shap.summary_plot(shap_values, X_valid)
|
||
|
||
# Force plot pour une observation
|
||
force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :])
|
||
shap.save_html(f"{self.path}/shap_force_plot.html", force_plot)
|
||
|
||
# # ---- Interprétation SHAP (optionnelle) ----
|
||
# try:
|
||
#
|
||
# print("\n===== 💡 ANALYSE SHAP =====")
|
||
# explainer = shap.TreeExplainer(model)
|
||
# shap_values = explainer.shap_values(X_valid)
|
||
# # shap.summary_plot(shap_values[1], X_valid)
|
||
# # Vérifie le type de sortie de shap_values
|
||
# if isinstance(shap_values, list):
|
||
# # Cas des modèles de classification (plusieurs classes)
|
||
# shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
|
||
# else:
|
||
# shap_values_to_plot = shap_values
|
||
#
|
||
# # Ajustement des dimensions au besoin
|
||
# if shap_values_to_plot.shape[1] != X_valid.shape[1]:
|
||
# print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})")
|
||
# min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1])
|
||
# shap_values_to_plot = shap_values_to_plot[:, :min_dim]
|
||
# X_to_plot = X_valid.iloc[:, :min_dim]
|
||
# else:
|
||
# X_to_plot = X_valid
|
||
#
|
||
# plt.figure(figsize=(12, 4))
|
||
# shap.summary_plot(shap_values_to_plot, X_to_plot, show=False)
|
||
# plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight")
|
||
# plt.close()
|
||
# except ImportError:
|
||
# print("\n(SHAP non installé — `pip install shap` pour activer l’analyse SHAP.)")
|
||
|
||
# FIN SHAP
|
||
|
||
def initTarget(self, df):
|
||
future = df['mid'].shift(-12)
|
||
df['future_dd'] = (future - df['mid']) / df['mid']
|
||
df['target'] = (df['future_dd'] > 0.003).astype(int)
|
||
|
||
def inspect_model(self, model):
|
||
"""
|
||
Affiche les informations d'un modèle ML déjà entraîné.
|
||
Compatible avec scikit-learn, xgboost, lightgbm, catboost...
|
||
"""
|
||
|
||
print("===== 🔍 INFORMATIONS DU MODÈLE =====")
|
||
|
||
# Type de modèle
|
||
print(f"Type : {type(model).__name__}")
|
||
print(f"Module : {model.__class__.__module__}")
|
||
|
||
# Hyperparamètres
|
||
if hasattr(model, "get_params"):
|
||
params = model.get_params()
|
||
print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====")
|
||
for k, v in params.items():
|
||
print(f"{k}: {v}")
|
||
|
||
# Nombre d’estimateurs
|
||
if hasattr(model, "n_estimators"):
|
||
print(f"\nNombre d’estimateurs : {model.n_estimators}")
|
||
|
||
# Importance des features
|
||
if hasattr(model, "feature_importances_"):
|
||
print("\n===== 📊 IMPORTANCE DES FEATURES =====")
|
||
|
||
# Correction ici :
|
||
feature_names = getattr(model, "feature_names_in_", None)
|
||
if isinstance(feature_names, np.ndarray):
|
||
feature_names = feature_names.tolist()
|
||
elif feature_names is None:
|
||
feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))]
|
||
|
||
fi = pd.DataFrame({
|
||
"feature": feature_names,
|
||
"importance": model.feature_importances_
|
||
}).sort_values(by="importance", ascending=False)
|
||
|
||
print(fi)
|
||
|
||
# Coefficients (modèles linéaires)
|
||
if hasattr(model, "coef_"):
|
||
print("\n===== ➗ COEFFICIENTS =====")
|
||
coef = np.array(model.coef_)
|
||
if coef.ndim == 1:
|
||
for i, c in enumerate(coef):
|
||
print(f"Feature {i}: {c:.6f}")
|
||
else:
|
||
print(coef)
|
||
|
||
# Intercept
|
||
if hasattr(model, "intercept_"):
|
||
print("\nIntercept :", model.intercept_)
|
||
|
||
# Classes connues
|
||
if hasattr(model, "classes_"):
|
||
print("\n===== 🎯 CLASSES =====")
|
||
print(model.classes_)
|
||
|
||
# Scores internes
|
||
for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]:
|
||
if hasattr(model, attr):
|
||
print(f"\n{attr} = {getattr(model, attr)}")
|
||
|
||
# Méthodes disponibles
|
||
print("\n===== 🧩 MÉTHODES DISPONIBLES =====")
|
||
methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)]
|
||
print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else ""))
|
||
|
||
print("\n===== ✅ FIN DE L’INSPECTION =====")
|
||
|
||
def analyze_model(self, model, X_train, X_valid, y_train, y_valid):
|
||
"""
|
||
Analyse complète d'un modèle ML supervisé (classification binaire).
|
||
Affiche performances, importance des features, matrices, seuils, etc.
|
||
"""
|
||
os.makedirs(self.path, exist_ok=True)
|
||
|
||
# ---- Prédictions ----
|
||
preds = model.predict(X_valid)
|
||
probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds
|
||
|
||
# ---- Performances globales ----
|
||
print("===== 📊 ÉVALUATION DU MODÈLE =====")
|
||
if hasattr(model, "feature_names_in_"):
|
||
print("Colonnes du modèle :", model.feature_names_in_)
|
||
print("Colonnes X_valid :", list(X_valid.columns))
|
||
print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}")
|
||
print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}")
|
||
|
||
print("TN (True Negative) / FP (False Positive)")
|
||
print("FN (False Negative) / TP (True Positive)")
|
||
print("\nRapport de classification :\n", classification_report(y_valid, preds))
|
||
|
||
# | Élément | Valeur | Signification |
|
||
# | ------------------- | ------ | ----------------------------------------------------------- |
|
||
# | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas d’achat) |
|
||
# | FP (False Positive) | 43 | Modèle a prédit 1 alors que c’était 0 (faux signal d’achat) |
|
||
# | FN (False Negative) | 108 | Modèle a prédit 0 alors que c’était 1 (manqué un achat) |
|
||
# | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal d’achat) |
|
||
|
||
# ---- Matrice de confusion ----
|
||
cm = confusion_matrix(y_valid, preds)
|
||
print("Matrice de confusion :\n", cm)
|
||
|
||
plt.figure(figsize=(4, 4))
|
||
plt.imshow(cm, cmap="Blues")
|
||
plt.title("Matrice de confusion")
|
||
plt.xlabel("Prédit")
|
||
plt.ylabel("Réel")
|
||
for i in range(2):
|
||
for j in range(2):
|
||
plt.text(j, i, cm[i, j], ha="center", va="center", color="black")
|
||
# plt.show()
|
||
plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight")
|
||
plt.close()
|
||
|
||
# ---- Importance des features ----
|
||
if hasattr(model, "feature_importances_"):
|
||
print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
|
||
importance = pd.DataFrame({
|
||
"feature": X_train.columns,
|
||
"importance": model.feature_importances_
|
||
}).sort_values(by="importance", ascending=False)
|
||
print(importance)
|
||
|
||
# Crée une figure plus grande
|
||
fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces
|
||
|
||
# Trace le bar plot sur cet axe
|
||
importance.plot.bar(x="feature", y="importance", legend=False, ax=ax)
|
||
|
||
# Tourner les labels pour plus de lisibilité
|
||
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right')
|
||
|
||
plt.title("Importance des features")
|
||
# plt.show()
|
||
plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight")
|
||
plt.close()
|
||
|
||
# ---- Arbre de décision (extrait) ----
|
||
if hasattr(model, "estimators_"):
|
||
print("\n===== 🌳 EXTRAIT D’UN ARBRE =====")
|
||
print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800])
|
||
|
||
# ---- Précision selon le seuil ----
|
||
thresholds = np.linspace(0.1, 0.9, 9)
|
||
print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====")
|
||
for t in thresholds:
|
||
preds_t = (probs > t).astype(int)
|
||
acc = accuracy_score(y_valid, preds_t)
|
||
print(f"Seuil {t:.1f} → précision {acc:.3f}")
|
||
|
||
# ---- ROC Curve ----
|
||
fpr, tpr, _ = roc_curve(y_valid, probs)
|
||
plt.figure(figsize=(5, 4))
|
||
plt.plot(fpr, tpr, label="ROC curve")
|
||
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
|
||
plt.xlabel("Taux de faux positifs")
|
||
plt.ylabel("Taux de vrais positifs")
|
||
plt.title("Courbe ROC")
|
||
plt.legend()
|
||
# plt.show()
|
||
plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight")
|
||
plt.close()
|
||
|
||
if hasattr(model, "predict_proba"):
|
||
y_proba = model.predict_proba(X_valid)[:, 1]
|
||
|
||
# Trace ou enregistre le graphique
|
||
self.plot_threshold_analysis(y_valid, y_proba, step=0.05, save_path=f"{self.path}/threshold_analysis.png")
|
||
|
||
# y_valid : vraies classes (0 / 1)
|
||
# y_proba : probabilités de la classe 1 prédites par ton modèle
|
||
# Exemple : y_proba = model.predict_proba(X_valid)[:, 1]
|
||
|
||
seuils = np.arange(0.0, 1.01, 0.05)
|
||
precisions, recalls, f1s = [], [], []
|
||
|
||
for seuil in seuils:
|
||
y_pred = (y_proba >= seuil).astype(int)
|
||
precisions.append(precision_score(y_valid, y_pred))
|
||
recalls.append(recall_score(y_valid, y_pred))
|
||
f1s.append(f1_score(y_valid, y_pred))
|
||
|
||
plt.figure(figsize=(10, 6))
|
||
plt.plot(seuils, precisions, label='Précision', marker='o')
|
||
plt.plot(seuils, recalls, label='Rappel', marker='o')
|
||
plt.plot(seuils, f1s, label='F1-score', marker='o')
|
||
|
||
# Ajoute un point pour le meilleur F1
|
||
best_idx = np.argmax(f1s)
|
||
plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f"Max F1 ({seuils[best_idx]:.2f})")
|
||
|
||
plt.title("Performance du modèle selon le seuil de probabilité")
|
||
plt.xlabel("Seuil de probabilité (classe 1)")
|
||
plt.ylabel("Score")
|
||
plt.grid(True, alpha=0.3)
|
||
plt.legend()
|
||
plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight')
|
||
# plt.show()
|
||
|
||
print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}")
|
||
|
||
print("\n===== ✅ FIN DE L’ANALYSE =====")
|
||
|
||
def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None):
|
||
"""
|
||
Affiche la précision, le rappel et le F1-score selon le seuil de décision.
|
||
y_true : labels réels (0 ou 1)
|
||
y_proba : probabilités prédites (P(hausse))
|
||
step : pas entre les seuils testés
|
||
save_path : si renseigné, enregistre l'image au lieu d'afficher
|
||
"""
|
||
|
||
# Le graphique généré affichera trois courbes :
|
||
# 🔵 Precision — la fiabilité de tes signaux haussiers.
|
||
# 🟢 Recall — la proportion de hausses que ton modèle détecte.
|
||
# 🟣 F1-score — le compromis optimal entre les deux.
|
||
|
||
thresholds = np.arange(0, 1.01, step)
|
||
precisions, recalls, f1s = [], [], []
|
||
|
||
for thr in thresholds:
|
||
preds = (y_proba >= thr).astype(int)
|
||
precisions.append(precision_score(y_true, preds))
|
||
recalls.append(recall_score(y_true, preds))
|
||
f1s.append(f1_score(y_true, preds))
|
||
|
||
plt.figure(figsize=(10, 6))
|
||
plt.plot(thresholds, precisions, label="Precision", linewidth=2)
|
||
plt.plot(thresholds, recalls, label="Recall", linewidth=2)
|
||
plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--")
|
||
plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5")
|
||
plt.title("📊 Performance selon le seuil de probabilité", fontsize=14)
|
||
plt.xlabel("Seuil de décision (threshold)")
|
||
plt.ylabel("Score")
|
||
plt.legend()
|
||
plt.grid(True, alpha=0.3)
|
||
|
||
if save_path:
|
||
plt.savefig(save_path, bbox_inches='tight')
|
||
print(f"✅ Graphique enregistré : {save_path}")
|
||
else:
|
||
plt.show()
|
||
|
||
def run(self):
|
||
# ================================
|
||
# 1. PREPARE DATA
|
||
# ================================
|
||
df = pd.read_feather(f"user_data/data/binance/BTC_USDC-{self.timeframe}.feather")
|
||
df['date'] = pd.to_datetime(df['date'])
|
||
df = df.set_index('date')
|
||
|
||
# Optionnel : ne garder qu’une plage temporelle
|
||
df = df["2025-01-01":"2025-02-01"]
|
||
df = df.reset_index('date')
|
||
|
||
# Supprimer NaN
|
||
df = df.dropna(subset=['open', 'high', 'low', 'close', 'volume'])
|
||
|
||
# S’assurer que tout est float
|
||
for col in ['open', 'high', 'low', 'close', 'volume']:
|
||
df[col] = df[col].astype(float)
|
||
|
||
print(df.head())
|
||
print(df.tail())
|
||
# print(df[['rsi', 'atr', 'target']].describe())
|
||
|
||
self.dataframe = self.calculateIndicators(df)
|
||
self.model_indicators = self.listUsableColumns(df) # ['returns','atr','slope','drawdown', 'close']
|
||
self.trainModel(df)
|
||
self.inspect_model(self.train_model)
|
||
|
||
|
||
crash = Crash()
|
||
crash.run()
|