RandomForestClassifier

This commit is contained in:
Jérôme Delacotte
2025-11-08 17:26:51 +01:00
parent 82ab199e2d
commit 4b22f3efb6
3 changed files with 289 additions and 83 deletions

View File

@@ -48,10 +48,13 @@ from sklearn.metrics import (
accuracy_score, accuracy_score,
roc_auc_score, roc_auc_score,
roc_curve, roc_curve,
precision_score, recall_score, precision_recall_curve,
f1_score
) )
from sklearn.tree import export_text from sklearn.tree import export_text
import inspect import inspect
from sklearn.feature_selection import mutual_info_classif
from sklearn.inspection import permutation_importance
from tabulate import tabulate from tabulate import tabulate
@@ -77,7 +80,19 @@ def normalize(df):
class Zeus_8_3_2_B_4_2(IStrategy): class Zeus_8_3_2_B_4_2(IStrategy):
# Machine Learning # Machine Learning
model = joblib.load('rf_model.pkl') model = joblib.load('rf_model.pkl')
model_indicators = ['rsi_deriv1', "max_rsi_12", "mid_smooth_5_deriv1", "volume_deriv1"] model_indicators = [
'rsi', 'rsi_deriv1', "max_rsi_12",
"bb_percent",
'vol_24',
'percent3',
'sma5_dist', 'sma5_deriv1',
'sma24_dist', 'sma24_deriv1',
'sma60_dist', 'sma60_deriv1',
'down_count', 'up_count',
'down_pct', 'slope_norm',
'min_max_60',
'rsi_slope', 'adx_change', 'volatility_ratio'
]
levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# startup_candle_count = 12 * 24 * 5 # startup_candle_count = 12 * 24 * 5
@@ -286,12 +301,12 @@ class Zeus_8_3_2_B_4_2(IStrategy):
pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True) pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True)
indic_5m_slope_sup_buy = CategoricalParameter(indicators, default="sma60", space='buy') indic_5m_slope_sup_buy = CategoricalParameter(indicators, default="sma60", space='buy')
indic_deriv_5m_slop_sup_buy = CategoricalParameter(indicators, default="sma12", space='buy', optimize=True, load=True) # indic_deriv_5m_slop_sup_buy = CategoricalParameter(indicators, default="sma12", space='buy', optimize=True, load=True)
deriv_5m_slope_sup_buy = DecimalParameter(-0.1, 0.5, default=0, decimals=2, space='buy', optimize=True, load=True) # deriv_5m_slope_sup_buy = DecimalParameter(-0.1, 0.5, default=0, decimals=2, space='buy', optimize=True, load=True)
indic_5m_slope_inf_buy = CategoricalParameter(indicators, default="sma60", space='buy') indic_5m_slope_inf_buy = CategoricalParameter(indicators, default="sma60", space='buy')
indic_deriv_5m_slop_sup_buy = CategoricalParameter(indicators, default="sma12", space='buy', optimize=True, load=True) # indic_deriv_5m_slop_inf_buy = CategoricalParameter(indicators, default="sma12", space='buy', optimize=True, load=True)
deriv_5m_slope_sup_buy = DecimalParameter(-0.1, 0.5, default=0, decimals=2, space='buy', optimize=True, load=True) # deriv_5m_slope_inf_buy = DecimalParameter(-0.1, 0.5, default=0, decimals=2, space='buy', optimize=True, load=True)
# indic_deriv1_5m = DecimalParameter(-2, 2, default=0, decimals=2, space='buy', optimize=True, load=True) # indic_deriv1_5m = DecimalParameter(-2, 2, default=0, decimals=2, space='buy', optimize=True, load=True)
@@ -392,7 +407,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
# allow_to_buy = True #(not self.stop_all) #& (not self.all_down) # allow_to_buy = True #(not self.stop_all) #& (not self.all_down)
# and val > self.buy_val.value #not last_candle['tendency'] in ('B-', 'B--') # (rate <= float(limit)) | (entry_tag == 'force_entry') # and val > self.buy_val.value #not last_candle['tendency'] in (-1, -2) # (rate <= float(limit)) | (entry_tag == 'force_entry')
allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
# if allow_to_buy: # if allow_to_buy:
@@ -562,7 +577,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# if (last_candle['mid_smooth_deriv1'] >= 0): # if (last_candle['mid_smooth_deriv1'] >= 0):
# return None # return None
# if (last_candle['tendency'] in ('H++', 'H+')) and (last_candle['rsi'] < 80): # if (last_candle['tendency'] in (2, 1)) and (last_candle['rsi'] < 80):
# return None # return None
# #
# if (last_candle['sma24_deriv1'] < 0 and before_last_candle['sma24_deriv1'] >= 0) and (current_profit > expected_profit): # if (last_candle['sma24_deriv1'] < 0 and before_last_candle['sma24_deriv1'] >= 0) and (current_profit > expected_profit):
@@ -573,7 +588,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# trend = last_candle['trend_class_1d'] # trend = last_candle['trend_class_1d']
# if (trend == "B-" or trend == "B--") and self.pairs[pair]['has_gain'] == 0: # and (last_candle[f"{indic_5m_sell}_deriv1"] <= indic_deriv1_5m_sell and last_candle[f"{indic_5m_sell}_deriv2"] <= indic_deriv2_5m_sell): # if (trend == "B-" or trend == "B--") and self.pairs[pair]['has_gain'] == 0: # and (last_candle[f"{indic_5m_sell}_deriv1"] <= indic_deriv1_5m_sell and last_candle[f"{indic_5m_sell}_deriv2"] <= indic_deriv2_5m_sell):
# #
# if (last_candle['max_rsi_12_1h'] > 75) and last_candle['trend_class_1h'] == 'H+' and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0): # if (last_candle['max_rsi_12_1h'] > 75) and last_candle['trend_class_1h'] == 1 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0):
# self.pairs[pair]['stop'] = True # self.pairs[pair]['stop'] = True
# self.log_trade( # self.log_trade(
# last_candle=last_candle, # last_candle=last_candle,
@@ -854,11 +869,11 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# On considère les petites valeurs comme zéro # On considère les petites valeurs comme zéro
if abs(d1) < eps: if abs(d1) < eps:
return 'P' # Palier / neutre return 0 # Palier / neutre
if d1 > d1_lim_sup: if d1 > d1_lim_sup:
return 'H++' if d2 > eps else 'H+' # Acceleration Hausse / Ralentissement Hausse return 2 if d2 > eps else 1 # Acceleration Hausse / Ralentissement Hausse
if d1 < d1_lim_inf: if d1 < d1_lim_inf:
return 'B--' if d2 < -eps else 'B-' # Acceleration Baisse / Ralentissement Baisse return -2 if d2 < -eps else -1 # Acceleration Baisse / Ralentissement Baisse
if abs(d1) < eps: if abs(d1) < eps:
return 'DH' if d2 > eps else 'DB' # Depart Hausse / Depart Baisse return 'DH' if d2 > eps else 'DB' # Depart Hausse / Depart Baisse
return 'Mid' return 'Mid'
@@ -874,13 +889,13 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# d1_lim_inf = -0.01 # d1_lim_inf = -0.01
# d1_lim_sup = 0.01 # d1_lim_sup = 0.01
# if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup: # if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup:
# return 'P' # Palier # return 0 # Palier
# if d1 == 0.0: # if d1 == 0.0:
# return 'DH' if d2 > 0 else 'DB' # Depart Hausse / Départ Baisse # return 'DH' if d2 > 0 else 'DB' # Depart Hausse / Départ Baisse
# if d1 > d1_lim_sup: # if d1 > d1_lim_sup:
# return 'H++' if d2 > 0 else 'H+' # Acceleration Hausse / Ralentissement Hausse # return 2 if d2 > 0 else 1 # Acceleration Hausse / Ralentissement Hausse
# if d1 < d1_lim_inf: # if d1 < d1_lim_inf:
# return 'B--' if d2 < 0 else 'B-' # Accéleration Baisse / Ralentissement Baisse # return -2 if d2 < 0 else -1 # Accéleration Baisse / Ralentissement Baisse
# return 'Mid' # return 'Mid'
# #
# dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1) # dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1)
@@ -898,6 +913,19 @@ class Zeus_8_3_2_B_4_2(IStrategy):
################### INFORMATIVE 1h ################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
informative = self.populateDataframe(informative, timeframe='1h') informative = self.populateDataframe(informative, timeframe='1h')
# informative['target_value'] = informative['sma5'].shift(-6).rolling(5).max() - informative['sma5'] * 1.005
# if self.dp.runmode.value in ('backtest'):
# self.trainModel(informative, metadata)
#
# # Préparer les features pour la prédiction
# features = informative[self.model_indicators].fillna(0)
#
# # Prédiction : probabilité que le prix monte
# probs = self.model.predict_proba(features)[:, 1]
#
# # Sauvegarder la probabilité pour lanalyse
# informative['ml_prob'] = probs
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
################### INFORMATIVE 1d ################### INFORMATIVE 1d
@@ -953,9 +981,6 @@ class Zeus_8_3_2_B_4_2(IStrategy):
dataframe['mid_smooth_5h'] dataframe['mid_smooth_5h']
dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean() dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean()
# Compter les baisses / hausses consécutives
# self.calculateDownAndUp(dataframe, limit=0.0001)
# =============================== # ===============================
# Lissage des valeurs Journalières # Lissage des valeurs Journalières
horizon_d = 12 * 5 * 24 horizon_d = 12 * 5 * 24
@@ -1030,29 +1055,36 @@ class Zeus_8_3_2_B_4_2(IStrategy):
dataframe['stop_buying'] = latched dataframe['stop_buying'] = latched
if self.dp.runmode.value in ('backtest'):
self.trainModel(dataframe, metadata) self.trainModel(dataframe, metadata)
# Préparer les features pour la prédiction # Préparer les features pour la prédiction
features = dataframe[self.model_indicators].fillna(0) features = dataframe[self.model_indicators].fillna(0)
# Prédiction : probabilité que le prix monte # Prédiction : probabilité que le prix monte
# probs = self.model.predict_proba(features)[:, 1] probs = self.model.predict_proba(features)[:, 1]
# Sauvegarder la probabilité pour lanalyse # Sauvegarder la probabilité pour lanalyse
# dataframe['ml_prob'] = probs dataframe['ml_prob'] = probs
# self.inspect_model(self.model) self.inspect_model(self.model)
return dataframe return dataframe
def trainModel(self, dataframe: DataFrame, metadata: dict): def trainModel(self, dataframe: DataFrame, metadata: dict):
df = dataframe.copy() df = dataframe.copy()
# 3⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies # 3⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
df['target'] = (1000 * (df['sma24'].shift(-24) - df['sma24'])) #.astype(int) # df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int)
df['target'] = (df['sma5'].shift(-12).rolling(12).max() > df['sma5'] * 1.00025).astype(int)
df['target'] = df['target'].fillna(0).astype(int)
# Nettoyage # Nettoyage
df = df.dropna() df = df.dropna()
X = df[self.model_indicators]
y = df['target'] # ta colonne cible binaire ou numérique
print(self.feature_auc_scores(X, y))
# 4⃣ Split train/test # 4⃣ Split train/test
X = df[self.model_indicators] X = df[self.model_indicators]
y = df['target'] y = df['target']
@@ -1060,39 +1092,50 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# 5⃣ Entraînement du modèle # 5⃣ Entraînement du modèle
# train_model = RandomForestClassifier(n_estimators=200, random_state=42) # train_model = RandomForestClassifier(n_estimators=200, random_state=42)
# train_model = RandomForestClassifier( train_model = RandomForestClassifier(
# n_estimators=300, n_estimators=300,
# max_depth=12, max_depth=12,
# min_samples_split=4, # min_samples_split=4,
# min_samples_leaf=2, # min_samples_leaf=2,
# max_features='sqrt', # max_features='sqrt',
# random_state=42, # random_state=42,
# n_jobs=-1 # n_jobs=-1,
# ) class_weight='balanced'
train_model = RandomForestRegressor(
n_estimators=300,
max_depth=None,
random_state=42,
n_jobs=-1
) )
train_model.fit(X_train, y_train) train_model.fit(X_train, y_train)
# 6⃣ Évaluer la précision (facultatif) # 6⃣ Évaluer la précision (facultatif)
preds = train_model.predict(X_test) preds = train_model.predict(X_test)
# acc = accuracy_score(y_test, preds) acc = accuracy_score(y_test, preds)
# print(f"Accuracy: {acc:.3f}") print(f"Accuracy: {acc:.3f}")
# 7⃣ Sauvegarde du modèle # 7⃣ Sauvegarde du modèle
joblib.dump(train_model, 'rf_model.pkl') joblib.dump(train_model, 'rf_model.pkl')
print("✅ Modèle sauvegardé sous rf_model.pkl") print("✅ Modèle sauvegardé sous rf_model.pkl")
y_pred = train_model.predict(X_test) # X = dataframe des features (après shift/rolling/indicators)
# y = target binaire ou décimale
# model = ton modèle entraîné (RandomForestClassifier ou Regressor)
print("R² :", r2_score(y_test, y_pred)) # # --- 1⃣ Mutual Information (MI) ---
print("RMSE :", mean_squared_error(y_test, y_pred)) #, squared=False)) # mi_scores = mutual_info_classif(X.fillna(0), y)
print("MAE :", mean_absolute_error(y_test, y_pred)) # mi_series = pd.Series(mi_scores, index=X.columns, name='MI')
#
# # --- 2⃣ Permutation Importance (PI) ---
# pi_result = permutation_importance(train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
# pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI')
#
# # --- 3⃣ Combinaison dans un seul dataframe ---
# importance_df = pd.concat([mi_series, pi_series], axis=1)
# importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle
# print(importance_df)
#
# importance_df.plot(kind='bar', figsize=(10, 5))
# plt.title("Mutual Info vs Permutation Importance")
# plt.ylabel("Score")
# plt.show()
# self.analyze_model(train_model, X_train, X_test, y_train, y_test) self.analyze_model(train_model, X_train, X_test, y_train, y_test)
def inspect_model(self, model): def inspect_model(self, model):
""" """
@@ -1252,39 +1295,138 @@ class Zeus_8_3_2_B_4_2(IStrategy):
plt.savefig(os.path.join(output_dir, "Courbe ROC.png"), bbox_inches="tight") plt.savefig(os.path.join(output_dir, "Courbe ROC.png"), bbox_inches="tight")
plt.close() plt.close()
# ---- Interprétation SHAP (optionnelle) ---- # # ---- Interprétation SHAP (optionnelle) ----
try: # try:
import shap # import shap
#
# print("\n===== 💡 ANALYSE SHAP =====")
# explainer = shap.TreeExplainer(model)
# shap_values = explainer.shap_values(X_test)
# # shap.summary_plot(shap_values[1], X_test)
# # Vérifie le type de sortie de shap_values
# if isinstance(shap_values, list):
# # Cas des modèles de classification (plusieurs classes)
# shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
# else:
# shap_values_to_plot = shap_values
#
# # Ajustement des dimensions au besoin
# if shap_values_to_plot.shape[1] != X_test.shape[1]:
# print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_test ({X_test.shape[1]})")
# min_dim = min(shap_values_to_plot.shape[1], X_test.shape[1])
# shap_values_to_plot = shap_values_to_plot[:, :min_dim]
# X_to_plot = X_test.iloc[:, :min_dim]
# else:
# X_to_plot = X_test
#
# plt.figure(figsize=(12, 4))
# shap.summary_plot(shap_values_to_plot, X_to_plot, show=False)
# plt.savefig(os.path.join(output_dir, "shap_summary.png"), bbox_inches="tight")
# plt.close()
# except ImportError:
# print("\n(SHAP non installé — `pip install shap` pour activer lanalyse SHAP.)")
print("\n===== 💡 ANALYSE SHAP =====") y_proba = model.predict_proba(X_test)[:, 1]
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# shap.summary_plot(shap_values[1], X_test)
# Vérifie le type de sortie de shap_values
if isinstance(shap_values, list):
# Cas des modèles de classification (plusieurs classes)
shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
else:
shap_values_to_plot = shap_values
# Ajustement des dimensions au besoin # Trace ou enregistre le graphique
if shap_values_to_plot.shape[1] != X_test.shape[1]: self.plot_threshold_analysis(y_test, y_proba, step=0.05,
print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_test ({X_test.shape[1]})") save_path="/home/souti/freqtrade/user_data/plots/threshold_analysis.png")
min_dim = min(shap_values_to_plot.shape[1], X_test.shape[1])
shap_values_to_plot = shap_values_to_plot[:, :min_dim]
X_to_plot = X_test.iloc[:, :min_dim]
else:
X_to_plot = X_test
plt.figure(figsize=(12, 10)) # y_test : vraies classes (0 / 1)
shap.summary_plot(shap_values_to_plot, X_to_plot, show=False) # y_proba : probabilités de la classe 1 prédites par ton modèle
plt.savefig(os.path.join(output_dir, "shap_summary.png"), bbox_inches="tight") # Exemple : y_proba = model.predict_proba(X_test)[:, 1]
plt.close()
except ImportError: seuils = np.arange(0.0, 1.01, 0.05)
print("\n(SHAP non installé — `pip install shap` pour activer lanalyse SHAP.)") precisions, recalls, f1s = [], [], []
for seuil in seuils:
y_pred = (y_proba >= seuil).astype(int)
precisions.append(precision_score(y_test, y_pred))
recalls.append(recall_score(y_test, y_pred))
f1s.append(f1_score(y_test, y_pred))
plt.figure(figsize=(10, 6))
plt.plot(seuils, precisions, label='Précision', marker='o')
plt.plot(seuils, recalls, label='Rappel', marker='o')
plt.plot(seuils, f1s, label='F1-score', marker='o')
# Ajoute un point pour le meilleur F1
best_idx = np.argmax(f1s)
plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f'Max F1 ({seuils[best_idx]:.2f})')
plt.title("Performance du modèle selon le seuil de probabilité")
plt.xlabel("Seuil de probabilité (classe 1)")
plt.ylabel("Score")
plt.grid(True, alpha=0.3)
plt.legend()
plt.savefig("/home/souti/freqtrade/user_data/plots/seuil_de_probabilite.png", bbox_inches='tight')
# plt.show()
print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}")
print("\n===== ✅ FIN DE LANALYSE =====") print("\n===== ✅ FIN DE LANALYSE =====")
def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None):
"""
Affiche la précision, le rappel et le F1-score selon le seuil de décision.
y_true : labels réels (0 ou 1)
y_proba : probabilités prédites (P(hausse))
step : pas entre les seuils testés
save_path : si renseigné, enregistre l'image au lieu d'afficher
"""
# Le graphique généré affichera trois courbes :
#
# 🔵 Precision — la fiabilité de tes signaux haussiers.
#
# 🟢 Recall — la proportion de hausses que ton modèle détecte.
#
# 🟣 F1-score — le compromis optimal entre les deux.
thresholds = np.arange(0, 1.01, step)
precisions, recalls, f1s = [], [], []
for thr in thresholds:
preds = (y_proba >= thr).astype(int)
precisions.append(precision_score(y_true, preds))
recalls.append(recall_score(y_true, preds))
f1s.append(f1_score(y_true, preds))
plt.figure(figsize=(10, 6))
plt.plot(thresholds, precisions, label="Precision", linewidth=2)
plt.plot(thresholds, recalls, label="Recall", linewidth=2)
plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--")
plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5")
plt.title("📊 Performance selon le seuil de probabilité", fontsize=14)
plt.xlabel("Seuil de décision (threshold)")
plt.ylabel("Score")
plt.legend()
plt.grid(True, alpha=0.3)
if save_path:
plt.savefig(save_path, bbox_inches='tight')
print(f"✅ Graphique enregistré : {save_path}")
else:
plt.show()
# # =============================
# # Exemple dutilisation :
# # =============================
# if __name__ == "__main__":
# # Exemple : chargement dun modèle et test
# import joblib
#
# model = joblib.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/model.pkl")
# data = np.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/test_data.npz")
# X_test, y_test = data["X"], data["y"]
#
# y_proba = model.predict_proba(X_test)[:, 1]
#
# # Trace ou enregistre le graphique
# plot_threshold_analysis(y_test, y_proba, step=0.05,
# save_path="/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/threshold_analysis.png")
def populateDataframe(self, dataframe, timeframe='5m'): def populateDataframe(self, dataframe, timeframe='5m'):
heikinashi = qtpylib.heikinashi(dataframe) heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open'] dataframe['haopen'] = heikinashi['open']
@@ -1292,10 +1434,10 @@ class Zeus_8_3_2_B_4_2(IStrategy):
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"] dataframe["percent"] = dataframe['close'].pct_change()
dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3) dataframe["percent3"] = dataframe['close'].pct_change(3)
dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12) dataframe["percent12"] = dataframe['close'].pct_change(12)
dataframe["percent24"] = (dataframe["close"] - dataframe["open"].shift(24)) / dataframe["open"].shift(24) dataframe["percent24"] = dataframe['close'].pct_change(24)
if self.dp.runmode.value in ('backtest'): if self.dp.runmode.value in ('backtest'):
dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close'] dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close']
@@ -1323,6 +1465,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12) dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12)
dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60) dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60)
dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60) dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60)
dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['close']) / dataframe['min60'])
# dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36) # dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36)
# dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36) # dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36)
# dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36'] # dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36']
@@ -1332,6 +1475,13 @@ class Zeus_8_3_2_B_4_2(IStrategy):
dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper'] dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = (
(dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
)
# Calcul MACD # Calcul MACD
macd, macdsignal, macdhist = talib.MACD( macd, macdsignal, macdhist = talib.MACD(
@@ -1403,6 +1553,46 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1) # tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1)
# dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean() # dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean()
###########################
# df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume
# Assure-toi qu'il est trié par date croissante
# --- Volatilité normalisée ---
dataframe['atr'] = ta.volatility.AverageTrueRange(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).average_true_range()
dataframe['atr_norm'] = dataframe['atr'] / dataframe['close']
# --- Force de tendance ---
dataframe['adx'] = ta.trend.ADXIndicator(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).adx()
# --- Volume directionnel (On Balance Volume) ---
dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(
close=dataframe['close'], volume=dataframe['volume']
).on_balance_volume()
# --- Volatilité récente (écart-type des rendements) ---
dataframe['ret'] = dataframe['close'].pct_change()
dataframe['vol_24'] = dataframe['ret'].rolling(24).std()
# Compter les baisses / hausses consécutives
self.calculateDownAndUp(dataframe, limit=0.0001)
# df : ton dataframe OHLCV + indicateurs existants
# Assurez-vous que les colonnes suivantes existent :
# 'max_rsi_12', 'roc_24', 'bb_percent_1h'
# --- Filtrage des NaN initiaux ---
# dataframe = dataframe.dropna()
dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI
dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance
dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width']
###########################
dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean()) dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean())
self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12) self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12)
@@ -1410,6 +1600,15 @@ class Zeus_8_3_2_B_4_2(IStrategy):
return dataframe return dataframe
def feature_auc_scores(self, X, y):
aucs = {}
for col in X.columns:
try:
aucs[col] = roc_auc_score(y, X[col].fillna(method='ffill').fillna(0))
except Exception:
aucs[col] = np.nan
return pd.Series(aucs).sort_values(ascending=False)
def macd_tendance_int(self, dataframe: pd.DataFrame, def macd_tendance_int(self, dataframe: pd.DataFrame,
macd_col='macd', macd_col='macd',
signal_col='macdsignal', signal_col='macdsignal',
@@ -1462,8 +1661,8 @@ class Zeus_8_3_2_B_4_2(IStrategy):
return tendance return tendance
def calculateDownAndUp(self, dataframe, limit=0.0001): def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['mid_smooth_1h_deriv1'] < limit # dataframe['hapercent'] <= limit dataframe['down'] = dataframe['hapercent'] <= limit
dataframe['up'] = dataframe['mid_smooth_1h_deriv1'] > limit # dataframe['hapercent'] >= limit dataframe['up'] = dataframe['hapercent'] >= limit
dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up_count'] = dataframe['up'].astype(int) * (
@@ -1502,6 +1701,12 @@ class Zeus_8_3_2_B_4_2(IStrategy):
factor1 = 100 * (ema_period / 5) factor1 = 100 * (ema_period / 5)
factor2 = 10 * (ema_period / 5) factor2 = 10 * (ema_period / 5)
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
# --- Distance à la moyenne mobile ---
dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"]
# dérivée relative simple # dérivée relative simple
dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
# lissage EMA # lissage EMA
@@ -2730,7 +2935,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# return last_candle['slope_norm_1d'] < last_candle['slope_norm_1h'] # return last_candle['slope_norm_1d'] < last_candle['slope_norm_1h']
if self.pairs[pair]['stop'] and last_candle['max_rsi_12_1h'] <= 60 and last_candle['trend_class_1h'] == 'B-': if self.pairs[pair]['stop'] and last_candle['max_rsi_12_1h'] <= 60 and last_candle['trend_class_1h'] == -1:
dispo = round(self.wallets.get_available_stake_amount()) dispo = round(self.wallets.get_available_stake_amount())
self.pairs[pair]['stop'] = False self.pairs[pair]['stop'] = False
self.log_trade( self.log_trade(
@@ -2948,15 +3153,15 @@ class Zeus_8_3_2_B_4_2(IStrategy):
q1, q2, q3, q4 = q q1, q2, q3, q4 = q
v = series.iloc[i] v = series.iloc[i]
if v <= q1: if v <= q1:
trend_class.append('B--') trend_class.append(-2)
elif v <= q2: elif v <= q2:
trend_class.append('B-') trend_class.append(-1)
elif v <= q3: elif v <= q3:
trend_class.append('P') trend_class.append(0)
elif v <= q4: elif v <= q4:
trend_class.append('H+') trend_class.append(1)
else: else:
trend_class.append('H++') trend_class.append(2)
return trend_class return trend_class
dataframe['slope_norm'] = df['slope_norm'] dataframe['slope_norm'] = df['slope_norm']

View File

@@ -1,3 +1,4 @@
from sklearn.ensemble import RandomForestRegressor from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score, mean_absolute_error from sklearn.metrics import r2_score, mean_absolute_error
import pandas as pd import pandas as pd