Statistique Tensorflow

This commit is contained in:
Jérôme Delacotte
2025-11-19 23:37:59 +01:00
parent e35ee0ed23
commit 30a45d592a

View File

@@ -79,6 +79,8 @@ from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam from tensorflow.keras.optimizers import Adam
from sklearn.metrics import mean_absolute_error, mean_squared_error
os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # désactive complètement le GPU os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # désactive complètement le GPU
os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices=false" os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices=false"
@@ -120,6 +122,8 @@ class Zeus_TensorFlow_1h(IStrategy):
future_steps = 12 future_steps = 12
y_no_scale = False y_no_scale = False
epochs = 120 epochs = 120
scaler_X = None
scaler_y = None
path = f"user_data/plots/" path = f"user_data/plots/"
@@ -1011,7 +1015,7 @@ class Zeus_TensorFlow_1h(IStrategy):
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[ dataframe.loc[
( (
(dataframe['lstm_pred'] > dataframe['mid']) qtpylib.crossed_above(dataframe['lstm_pred'], dataframe['mid'])
), ['enter_long', 'enter_tag']] = (1, f"future") ), ['enter_long', 'enter_tag']] = (1, f"future")
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
@@ -1025,7 +1029,7 @@ class Zeus_TensorFlow_1h(IStrategy):
# dataframe.loc[ # dataframe.loc[
# ( # (
# (dataframe['lstm_pred'] < 0) & (dataframe['hapercent'] < 0) # qtpylib.crossed_below(dataframe['lstm_pred'], dataframe['mid'])
# ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future")
# dataframe.loc[ # dataframe.loc[
@@ -1223,7 +1227,7 @@ class Zeus_TensorFlow_1h(IStrategy):
# factors = [1, 1.2, 1.3, 1.4] # factors = [1, 1.2, 1.3, 1.4]
if self.pairs[pair]['count_of_buys'] == 0: if self.pairs[pair]['count_of_buys'] == 0:
factor = 1 #65 / min(65, last_candle['rsi_1d']) factor = 1 #65 / min(65, last_candle['rsi_1d'])
if last_candle['open'] < last_candle['sma5'] and last_candle['mid_smooth_12_deriv1'] > 0: if last_candle['min_max_60'] > 0.04:
factor = 2 factor = 2
adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor)
@@ -1739,32 +1743,38 @@ class Zeus_TensorFlow_1h(IStrategy):
# 7) Sauvegarde # 7) Sauvegarde
self.model.save(f"{self.path}/lstm_model.keras") self.model.save(f"{self.path}/lstm_model.keras")
# joblib.dump(self.scaler_X, f"{self.path}/lstm_scaler_X.pkl") joblib.dump(self.scaler_X, f"{self.path}/lstm_scaler_X.pkl")
# joblib.dump(self.scaler_y, f"{self.path}/lstm_scaler_y.pkl") joblib.dump(self.scaler_y, f"{self.path}/lstm_scaler_y.pkl")
def tensorFlowPrepareDataFrame(self, dataframe, future_steps, lookback): def tensorFlowPrepareDataFrame(self, dataframe, future_steps, lookback):
target = self.indicator_target target = self.indicator_target
# 1) Détecter NaN / Inf et nettoyer # 1) Détecter NaN / Inf et nettoyer
feature_columns = self.model_indicators # [col for col in dataframe.columns if col != target] feature_columns = self.model_indicators # [col for col in dataframe.columns if col != target]
df = dataframe.copy() df = dataframe.copy()
df.replace([np.inf, -np.inf], np.nan, inplace=True) df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.dropna(subset=feature_columns + [target], inplace=True) df.dropna(subset=feature_columns + [target], inplace=True)
# 2) Séparer features et cible # 2) Séparer features et cible
X_values = df[feature_columns].values X_values = df[feature_columns].values
y_values = df[target].values.reshape(-1, 1) y_values = df[target].values.reshape(-1, 1)
# 3) Gestion colonnes constantes (éviter division par zéro) # 3) Gestion colonnes constantes (éviter division par zéro)
for i in range(X_values.shape[1]): for i in range(X_values.shape[1]):
if X_values[:, i].max() == X_values[:, i].min(): if X_values[:, i].max() == X_values[:, i].min():
X_values[:, i] = 0.0 X_values[:, i] = 0.0
if y_values.max() == y_values.min(): if y_values.max() == y_values.min():
y_values[:] = 0.0 y_values[:] = 0.0
# 4) Normalisation # 4) Normalisation
if self.scaler_X is None:
self.scaler_X = MinMaxScaler() self.scaler_X = MinMaxScaler()
X_scaled = self.scaler_X.fit_transform(X_values) X_scaled = self.scaler_X.fit_transform(X_values)
if self.y_no_scale: if self.y_no_scale:
y_scaled = y_values y_scaled = y_values
else: else:
if self.scaler_y is None:
self.scaler_y = MinMaxScaler() self.scaler_y = MinMaxScaler()
y_scaled = self.scaler_y.fit_transform(y_values) y_scaled = self.scaler_y.fit_transform(y_values)
@@ -1791,8 +1801,8 @@ class Zeus_TensorFlow_1h(IStrategy):
# charger le modèle si pas déjà chargé # charger le modèle si pas déjà chargé
if self.model is None: if self.model is None:
self.model = load_model(f"{self.path}/lstm_model.keras", compile=False) self.model = load_model(f"{self.path}/lstm_model.keras", compile=False)
# self.scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl") self.scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl")
# self.scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl") self.scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl")
X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback) X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback)
@@ -1827,4 +1837,100 @@ class Zeus_TensorFlow_1h(IStrategy):
end = start + len(y_pred) end = start + len(y_pred)
# preds[start:end] = y_pred[:end - start] # preds[start:end] = y_pred[:end - start]
preds[start:start + len(y_pred)] = y_pred preds[start:start + len(y_pred)] = y_pred
# Décaler le dataframe pour ne garder que les lignes avec prédictions
y_true = dataframe[self.indicator_target][start:]
mae, rmse, mape, hit_ratio = self.reliability_report(y_true, y_pred)
# 6) Graphiques
# 4) Prédictions avec MC Dropout
self.plot_lstm_predictions(dataframe, preds)
self.plot_error_histogram(y_true, y_pred)
# 7) Rapport texte
rapport = self.generate_text_report(mae, rmse, mape, hit_ratio, self.future_steps)
print(rapport)
return preds return preds
def generate_text_report(self, mae, rmse, mape, hit_ratio, n):
txt = f"""
Fiabilité du modèle à horizon {n} bougies
-----------------------------------------
MAE: {mae:.4f}
RMSE: {rmse:.4f}
MAPE: {mape:.2f} %
Hit-ratio (direction): {hit_ratio*100:.2f} %
Interprétation :
- MAE faible = bonne précision absolue.
- MAPE faible = bonne précision relative au prix.
- Hit-ratio > 55% = exploitable pour un système de trading directionnel.
- 50% ≈ hasard.
"""
return txt
def plot_lstm_predictions(self, dataframe, preds):
"""
Génère un graphique des prédictions LSTM vs la vraie valeur de l'indicateur.
Args:
dataframe: pd.DataFrame contenant l'indicateur cible.
preds: liste ou np.array des prédictions, alignée sur le dataframe
avec des NaN en début à cause du lookback.
"""
# Convertir preds en np.array
preds_array = np.array(preds)
# Récupérer la vraie valeur de l'indicateur
y_true = dataframe[self.indicator_target].values
# Masque pour ne garder que les positions avec prédiction
mask_valid = ~np.isnan(preds_array)
y_true_valid = y_true[mask_valid]
y_pred_valid = preds_array[mask_valid]
# Créer le graphique
plt.figure(figsize=(15, 5))
plt.plot(y_true_valid, label="Vraie valeur", color="blue")
plt.plot(y_pred_valid, label="Prédiction LSTM", color="orange")
plt.title(f"Prédictions LSTM vs vrai {self.indicator_target}")
plt.xlabel("Index")
plt.ylabel(self.indicator_target)
plt.legend()
plt.grid(True)
plt.savefig(f"{self.path}/Prédictions LSTM vs vrai {self.indicator_target}.png")
plt.close()
def plot_error_histogram(self, y_true, y_pred):
errors = y_pred - y_true
plt.figure(figsize=(8,5))
plt.hist(errors, bins=30)
plt.title("Distribution des erreurs de prédiction")
# plt.show()
plt.savefig(f"{self.path}/Distribution des erreurs de prédiction.png")
plt.close()
def reliability_report(self, y_true, y_pred):
# moyenne des différences absolues entre les valeurs prédites et les valeurs réelles
# | Métrique | Ce quelle mesure | Sensibilité |
# | --------- | ----------------------- | ---------------------------- |
# | MAE | Écart moyen absolu | Moyenne des erreurs |
# | RMSE | Écart quadratique moyen | Sensible aux grosses erreurs |
# | MAPE | % derreur moyenne | Interprétation facile |
# | Hit ratio | Direction correcte | Pour trading / signaux |
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
# hit-ratio directionnel
real_dir = np.sign(np.diff(y_true))
pred_dir = np.sign(np.diff(y_pred))
hit_ratio = (real_dir == pred_dir).mean()
return mae, rmse, mape, hit_ratio