RandomForestRegressor

This commit is contained in:
Jérôme Delacotte
2025-11-07 20:56:30 +01:00
parent c4bba8aad8
commit 82ab199e2d
2 changed files with 305 additions and 8 deletions

View File

@@ -35,6 +35,23 @@ from collections import Counter
logger = logging.getLogger(__name__)
# Machine Learning
from sklearn.ensemble import RandomForestClassifier,RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from sklearn.metrics import accuracy_score
import joblib
import matplotlib.pyplot as plt
from sklearn.metrics import (
classification_report,
confusion_matrix,
accuracy_score,
roc_auc_score,
roc_curve,
)
from sklearn.tree import export_text
import inspect
from tabulate import tabulate
@@ -58,6 +75,10 @@ def normalize(df):
class Zeus_8_3_2_B_4_2(IStrategy):
# Machine Learning
model = joblib.load('rf_model.pkl')
model_indicators = ['rsi_deriv1', "max_rsi_12", "mid_smooth_5_deriv1", "volume_deriv1"]
levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# startup_candle_count = 12 * 24 * 5
@@ -1009,8 +1030,261 @@ class Zeus_8_3_2_B_4_2(IStrategy):
dataframe['stop_buying'] = latched
self.trainModel(dataframe, metadata)
# Préparer les features pour la prédiction
features = dataframe[self.model_indicators].fillna(0)
# Prédiction : probabilité que le prix monte
# probs = self.model.predict_proba(features)[:, 1]
# Sauvegarder la probabilité pour lanalyse
# dataframe['ml_prob'] = probs
# self.inspect_model(self.model)
return dataframe
def trainModel(self, dataframe: DataFrame, metadata: dict):
df = dataframe.copy()
# 3⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
df['target'] = (1000 * (df['sma24'].shift(-24) - df['sma24'])) #.astype(int)
# Nettoyage
df = df.dropna()
# 4⃣ Split train/test
X = df[self.model_indicators]
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# 5⃣ Entraînement du modèle
# train_model = RandomForestClassifier(n_estimators=200, random_state=42)
# train_model = RandomForestClassifier(
# n_estimators=300,
# max_depth=12,
# min_samples_split=4,
# min_samples_leaf=2,
# max_features='sqrt',
# random_state=42,
# n_jobs=-1
# )
train_model = RandomForestRegressor(
n_estimators=300,
max_depth=None,
random_state=42,
n_jobs=-1
)
train_model.fit(X_train, y_train)
# 6⃣ Évaluer la précision (facultatif)
preds = train_model.predict(X_test)
# acc = accuracy_score(y_test, preds)
# print(f"Accuracy: {acc:.3f}")
# 7⃣ Sauvegarde du modèle
joblib.dump(train_model, 'rf_model.pkl')
print("✅ Modèle sauvegardé sous rf_model.pkl")
y_pred = train_model.predict(X_test)
print("R² :", r2_score(y_test, y_pred))
print("RMSE :", mean_squared_error(y_test, y_pred)) #, squared=False))
print("MAE :", mean_absolute_error(y_test, y_pred))
# self.analyze_model(train_model, X_train, X_test, y_train, y_test)
def inspect_model(self, model):
"""
Affiche les informations d'un modèle ML déjà entraîné.
Compatible avec scikit-learn, xgboost, lightgbm, catboost...
"""
print("===== 🔍 INFORMATIONS DU MODÈLE =====")
# Type de modèle
print(f"Type : {type(model).__name__}")
print(f"Module : {model.__class__.__module__}")
# Hyperparamètres
if hasattr(model, "get_params"):
params = model.get_params()
print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====")
for k, v in params.items():
print(f"{k}: {v}")
# Nombre destimateurs
if hasattr(model, "n_estimators"):
print(f"\nNombre destimateurs : {model.n_estimators}")
# Importance des features
if hasattr(model, "feature_importances_"):
print("\n===== 📊 IMPORTANCE DES FEATURES =====")
# Correction ici :
feature_names = getattr(model, "feature_names_in_", None)
if isinstance(feature_names, np.ndarray):
feature_names = feature_names.tolist()
elif feature_names is None:
feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))]
fi = pd.DataFrame({
"feature": feature_names,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(fi)
# Coefficients (modèles linéaires)
if hasattr(model, "coef_"):
print("\n===== ➗ COEFFICIENTS =====")
coef = np.array(model.coef_)
if coef.ndim == 1:
for i, c in enumerate(coef):
print(f"Feature {i}: {c:.6f}")
else:
print(coef)
# Intercept
if hasattr(model, "intercept_"):
print("\nIntercept :", model.intercept_)
# Classes connues
if hasattr(model, "classes_"):
print("\n===== 🎯 CLASSES =====")
print(model.classes_)
# Scores internes
for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]:
if hasattr(model, attr):
print(f"\n{attr} = {getattr(model, attr)}")
# Méthodes disponibles
print("\n===== 🧩 MÉTHODES DISPONIBLES =====")
methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)]
print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else ""))
print("\n===== ✅ FIN DE LINSPECTION =====")
def analyze_model(self, model, X_train, X_test, y_train, y_test):
"""
Analyse complète d'un modèle ML supervisé (classification binaire).
Affiche performances, importance des features, matrices, seuils, etc.
"""
output_dir = "user_data/plots"
os.makedirs(output_dir, exist_ok=True)
# ---- Prédictions ----
preds = model.predict(X_test)
probs = model.predict_proba(X_test)[:, 1] if hasattr(model, "predict_proba") else preds
# ---- Performances globales ----
print("===== 📊 ÉVALUATION DU MODÈLE =====")
print("Colonnes du modèle :", model.feature_names_in_)
print("Colonnes X_test :", list(X_test.columns))
print(f"Accuracy: {accuracy_score(y_test, preds):.3f}")
print(f"ROC AUC : {roc_auc_score(y_test, probs):.3f}")
print("TN (True Negative) / FP (False Positive)")
print("FN (False Negative) / TP (True Positive)")
print("\nRapport de classification :\n", classification_report(y_test, preds))
# | Élément | Valeur | Signification |
# | ------------------- | ------ | ----------------------------------------------------------- |
# | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas dachat) |
# | FP (False Positive) | 43 | Modèle a prédit 1 alors que cétait 0 (faux signal dachat) |
# | FN (False Negative) | 108 | Modèle a prédit 0 alors que cétait 1 (manqué un achat) |
# | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal dachat) |
# ---- Matrice de confusion ----
cm = confusion_matrix(y_test, preds)
print("Matrice de confusion :\n", cm)
plt.figure(figsize=(4, 4))
plt.imshow(cm, cmap="Blues")
plt.title("Matrice de confusion")
plt.xlabel("Prédit")
plt.ylabel("Réel")
for i in range(2):
for j in range(2):
plt.text(j, i, cm[i, j], ha="center", va="center", color="black")
# plt.show()
plt.savefig(os.path.join(output_dir, "Matrice de confusion.png"), bbox_inches="tight")
plt.close()
# ---- Importance des features ----
if hasattr(model, "feature_importances_"):
print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
importance = pd.DataFrame({
"feature": X_train.columns,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(importance)
importance.plot.bar(x="feature", y="importance", legend=False, figsize=(6, 3))
plt.title("Importance des features")
# plt.show()
plt.savefig(os.path.join(output_dir, "Importance des features.png"), bbox_inches="tight")
plt.close()
# ---- Arbre de décision (extrait) ----
if hasattr(model, "estimators_"):
print("\n===== 🌳 EXTRAIT DUN ARBRE =====")
print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800])
# ---- Précision selon le seuil ----
thresholds = np.linspace(0.1, 0.9, 9)
print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====")
for t in thresholds:
preds_t = (probs > t).astype(int)
acc = accuracy_score(y_test, preds_t)
print(f"Seuil {t:.1f} → précision {acc:.3f}")
# ---- ROC Curve ----
fpr, tpr, _ = roc_curve(y_test, probs)
plt.figure(figsize=(5, 4))
plt.plot(fpr, tpr, label="ROC curve")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
plt.xlabel("Taux de faux positifs")
plt.ylabel("Taux de vrais positifs")
plt.title("Courbe ROC")
plt.legend()
# plt.show()
plt.savefig(os.path.join(output_dir, "Courbe ROC.png"), bbox_inches="tight")
plt.close()
# ---- Interprétation SHAP (optionnelle) ----
try:
import shap
print("\n===== 💡 ANALYSE SHAP =====")
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_test)
# shap.summary_plot(shap_values[1], X_test)
# Vérifie le type de sortie de shap_values
if isinstance(shap_values, list):
# Cas des modèles de classification (plusieurs classes)
shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
else:
shap_values_to_plot = shap_values
# Ajustement des dimensions au besoin
if shap_values_to_plot.shape[1] != X_test.shape[1]:
print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_test ({X_test.shape[1]})")
min_dim = min(shap_values_to_plot.shape[1], X_test.shape[1])
shap_values_to_plot = shap_values_to_plot[:, :min_dim]
X_to_plot = X_test.iloc[:, :min_dim]
else:
X_to_plot = X_test
plt.figure(figsize=(12, 10))
shap.summary_plot(shap_values_to_plot, X_to_plot, show=False)
plt.savefig(os.path.join(output_dir, "shap_summary.png"), bbox_inches="tight")
plt.close()
except ImportError:
print("\n(SHAP non installé — `pip install shap` pour activer lanalyse SHAP.)")
print("\n===== ✅ FIN DE LANALYSE =====")
def populateDataframe(self, dataframe, timeframe='5m'):
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
@@ -1130,6 +1404,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean()
dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean())
self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12)
self.setTrends(dataframe)
@@ -1253,13 +1528,13 @@ class Zeus_8_3_2_B_4_2(IStrategy):
eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1)
eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2)
if verbose and self.dp.runmode.value in ('backtest'):
stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T
stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0)
print(f"---- Derivatives stats {timeframe}----")
print(stats)
print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}")
print("---------------------------")
# if verbose and self.dp.runmode.value in ('backtest'):
# stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T
# stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0)
# print(f"---- Derivatives stats {timeframe}----")
# print(stats)
# print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}")
# print("---------------------------")
# mapping tendency
def tag_by_derivatives(row):
@@ -2635,7 +2910,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
def __init__(self, config: dict) -> None:
super().__init__(config)
self.parameters = self.load_params_tree("user_data/strategies/params/")
# self.parameters = self.load_params_tree("user_data/strategies/params/")
def setTrends(self, dataframe: DataFrame):
SMOOTH_WIN=10