diff --git a/Zeus_TensorFlow_1h.py b/Zeus_TensorFlow_1h.py index 7bfe2cf..ec17f31 100644 --- a/Zeus_TensorFlow_1h.py +++ b/Zeus_TensorFlow_1h.py @@ -79,6 +79,8 @@ from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense from tensorflow.keras.optimizers import Adam +from sklearn.metrics import mean_absolute_error, mean_squared_error + os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # désactive complètement le GPU os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices=false" @@ -120,6 +122,8 @@ class Zeus_TensorFlow_1h(IStrategy): future_steps = 12 y_no_scale = False epochs = 120 + scaler_X = None + scaler_y = None path = f"user_data/plots/" @@ -1011,7 +1015,7 @@ class Zeus_TensorFlow_1h(IStrategy): def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe.loc[ ( - (dataframe['lstm_pred'] > dataframe['mid']) + qtpylib.crossed_above(dataframe['lstm_pred'], dataframe['mid']) ), ['enter_long', 'enter_tag']] = (1, f"future") dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) @@ -1025,7 +1029,7 @@ class Zeus_TensorFlow_1h(IStrategy): # dataframe.loc[ # ( - # (dataframe['lstm_pred'] < 0) & (dataframe['hapercent'] < 0) + # qtpylib.crossed_below(dataframe['lstm_pred'], dataframe['mid']) # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") # dataframe.loc[ @@ -1223,7 +1227,7 @@ class Zeus_TensorFlow_1h(IStrategy): # factors = [1, 1.2, 1.3, 1.4] if self.pairs[pair]['count_of_buys'] == 0: factor = 1 #65 / min(65, last_candle['rsi_1d']) - if last_candle['open'] < last_candle['sma5'] and last_candle['mid_smooth_12_deriv1'] > 0: + if last_candle['min_max_60'] > 0.04: factor = 2 adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) @@ -1739,33 +1743,39 @@ class Zeus_TensorFlow_1h(IStrategy): # 7) Sauvegarde self.model.save(f"{self.path}/lstm_model.keras") - # joblib.dump(self.scaler_X, f"{self.path}/lstm_scaler_X.pkl") - # joblib.dump(self.scaler_y, f"{self.path}/lstm_scaler_y.pkl") + joblib.dump(self.scaler_X, f"{self.path}/lstm_scaler_X.pkl") + joblib.dump(self.scaler_y, f"{self.path}/lstm_scaler_y.pkl") def tensorFlowPrepareDataFrame(self, dataframe, future_steps, lookback): target = self.indicator_target + # 1) Détecter NaN / Inf et nettoyer feature_columns = self.model_indicators # [col for col in dataframe.columns if col != target] df = dataframe.copy() df.replace([np.inf, -np.inf], np.nan, inplace=True) df.dropna(subset=feature_columns + [target], inplace=True) + # 2) Séparer features et cible X_values = df[feature_columns].values y_values = df[target].values.reshape(-1, 1) + # 3) Gestion colonnes constantes (éviter division par zéro) for i in range(X_values.shape[1]): if X_values[:, i].max() == X_values[:, i].min(): X_values[:, i] = 0.0 if y_values.max() == y_values.min(): y_values[:] = 0.0 + # 4) Normalisation - self.scaler_X = MinMaxScaler() + if self.scaler_X is None: + self.scaler_X = MinMaxScaler() X_scaled = self.scaler_X.fit_transform(X_values) if self.y_no_scale: y_scaled = y_values else: - self.scaler_y = MinMaxScaler() + if self.scaler_y is None: + self.scaler_y = MinMaxScaler() y_scaled = self.scaler_y.fit_transform(y_values) # 5) Création des fenêtres glissantes @@ -1791,8 +1801,8 @@ class Zeus_TensorFlow_1h(IStrategy): # charger le modèle si pas déjà chargé if self.model is None: self.model = load_model(f"{self.path}/lstm_model.keras", compile=False) - # self.scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl") - # self.scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl") + self.scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl") + self.scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl") X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback) @@ -1827,4 +1837,100 @@ class Zeus_TensorFlow_1h(IStrategy): end = start + len(y_pred) # preds[start:end] = y_pred[:end - start] preds[start:start + len(y_pred)] = y_pred - return preds \ No newline at end of file + + # Décaler le dataframe pour ne garder que les lignes avec prédictions + y_true = dataframe[self.indicator_target][start:] + mae, rmse, mape, hit_ratio = self.reliability_report(y_true, y_pred) + + # 6) Graphiques + # 4) Prédictions avec MC Dropout + self.plot_lstm_predictions(dataframe, preds) + self.plot_error_histogram(y_true, y_pred) + + # 7) Rapport texte + rapport = self.generate_text_report(mae, rmse, mape, hit_ratio, self.future_steps) + print(rapport) + + return preds + + def generate_text_report(self, mae, rmse, mape, hit_ratio, n): + txt = f""" + Fiabilité du modèle à horizon {n} bougies + ----------------------------------------- + MAE: {mae:.4f} + RMSE: {rmse:.4f} + MAPE: {mape:.2f} % + + Hit-ratio (direction): {hit_ratio*100:.2f} % + + Interprétation : + - MAE faible = bonne précision absolue. + - MAPE faible = bonne précision relative au prix. + - Hit-ratio > 55% = exploitable pour un système de trading directionnel. + - 50% ≈ hasard. + """ + return txt + + def plot_lstm_predictions(self, dataframe, preds): + """ + Génère un graphique des prédictions LSTM vs la vraie valeur de l'indicateur. + + Args: + dataframe: pd.DataFrame contenant l'indicateur cible. + preds: liste ou np.array des prédictions, alignée sur le dataframe + avec des NaN en début à cause du lookback. + """ + # Convertir preds en np.array + preds_array = np.array(preds) + + # Récupérer la vraie valeur de l'indicateur + y_true = dataframe[self.indicator_target].values + + # Masque pour ne garder que les positions avec prédiction + mask_valid = ~np.isnan(preds_array) + y_true_valid = y_true[mask_valid] + y_pred_valid = preds_array[mask_valid] + + # Créer le graphique + plt.figure(figsize=(15, 5)) + plt.plot(y_true_valid, label="Vraie valeur", color="blue") + plt.plot(y_pred_valid, label="Prédiction LSTM", color="orange") + plt.title(f"Prédictions LSTM vs vrai {self.indicator_target}") + plt.xlabel("Index") + plt.ylabel(self.indicator_target) + plt.legend() + plt.grid(True) + plt.savefig(f"{self.path}/Prédictions LSTM vs vrai {self.indicator_target}.png") + plt.close() + + def plot_error_histogram(self, y_true, y_pred): + errors = y_pred - y_true + plt.figure(figsize=(8,5)) + plt.hist(errors, bins=30) + plt.title("Distribution des erreurs de prédiction") + # plt.show() + plt.savefig(f"{self.path}/Distribution des erreurs de prédiction.png") + plt.close() + + def reliability_report(self, y_true, y_pred): + # moyenne des différences absolues entre les valeurs prédites et les valeurs réelles + # | Métrique | Ce qu’elle mesure | Sensibilité | + # | --------- | ----------------------- | ---------------------------- | + # | MAE | Écart moyen absolu | Moyenne des erreurs | + # | RMSE | Écart quadratique moyen | Sensible aux grosses erreurs | + # | MAPE | % d’erreur moyenne | Interprétation facile | + # | Hit ratio | Direction correcte | Pour trading / signaux | + + mae = mean_absolute_error(y_true, y_pred) + rmse = np.sqrt(mean_squared_error(y_true, y_pred)) + mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100 + + # hit-ratio directionnel + real_dir = np.sign(np.diff(y_true)) + pred_dir = np.sign(np.diff(y_pred)) + hit_ratio = (real_dir == pred_dir).mean() + + return mae, rmse, mape, hit_ratio + + +