2447 lines
107 KiB
Python
2447 lines
107 KiB
Python
# Zeus Strategy: First Generation of GodStra Strategy with maximum
|
||
# AVG/MID profit in USDT
|
||
# Author: @Mablue (Masoud Azizi)
|
||
# github: https://github.com/mablue/
|
||
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
|
||
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
|
||
# --- Do not remove these libs ---
|
||
from datetime import timedelta, datetime
|
||
from freqtrade.persistence import Trade
|
||
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
|
||
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
|
||
import pandas as pd
|
||
import numpy as np
|
||
import os
|
||
import json
|
||
import csv
|
||
from pandas import DataFrame
|
||
from typing import Optional, Union, Tuple
|
||
import math
|
||
import logging
|
||
from pathlib import Path
|
||
|
||
# --------------------------------
|
||
|
||
# Add your lib to import here test git
|
||
import ta
|
||
import talib.abstract as talib
|
||
import freqtrade.vendor.qtpylib.indicators as qtpylib
|
||
from datetime import timezone, timedelta
|
||
import mpmath as mp
|
||
|
||
# Machine Learning
|
||
from sklearn.ensemble import RandomForestClassifier,RandomForestRegressor
|
||
from sklearn.model_selection import train_test_split
|
||
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
|
||
from sklearn.metrics import accuracy_score
|
||
import joblib
|
||
import matplotlib.pyplot as plt
|
||
from sklearn.metrics import (
|
||
classification_report,
|
||
confusion_matrix,
|
||
accuracy_score,
|
||
roc_auc_score,
|
||
roc_curve,
|
||
precision_score, recall_score, precision_recall_curve,
|
||
f1_score
|
||
)
|
||
from sklearn.tree import export_text
|
||
import inspect
|
||
from sklearn.feature_selection import mutual_info_classif
|
||
from sklearn.inspection import permutation_importance
|
||
from lightgbm import LGBMClassifier
|
||
from sklearn.calibration import CalibratedClassifierCV
|
||
from sklearn.feature_selection import SelectFromModel
|
||
from tabulate import tabulate
|
||
from sklearn.model_selection import GridSearchCV
|
||
from sklearn.feature_selection import VarianceThreshold
|
||
import seaborn as sns
|
||
from xgboost import XGBClassifier
|
||
import optuna
|
||
from optuna.visualization import plot_optimization_history
|
||
from optuna.visualization import plot_slice
|
||
from optuna.visualization import plot_param_importances
|
||
from optuna.visualization import plot_parallel_coordinate
|
||
import shap
|
||
from sklearn.inspection import PartialDependenceDisplay
|
||
|
||
from sklearn.model_selection import train_test_split
|
||
from sklearn.metrics import f1_score
|
||
from xgboost import XGBClassifier
|
||
|
||
from sklearn.model_selection import train_test_split
|
||
from sklearn.linear_model import LogisticRegression
|
||
from sklearn.calibration import CalibratedClassifierCV
|
||
from sklearn.metrics import brier_score_loss, roc_auc_score
|
||
|
||
from sklearn.preprocessing import StandardScaler
|
||
from sklearn.pipeline import Pipeline
|
||
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Couleurs ANSI de base
|
||
RED = "\033[31m"
|
||
GREEN = "\033[32m"
|
||
YELLOW = "\033[33m"
|
||
BLUE = "\033[34m"
|
||
MAGENTA = "\033[35m"
|
||
CYAN = "\033[36m"
|
||
RESET = "\033[0m"
|
||
|
||
|
||
class FrictradeLearning(IStrategy):
|
||
startup_candle_count = 180
|
||
train_model = None
|
||
model_indicators = []
|
||
DEFAULT_PARAMS = {
|
||
"rsi_buy": 30,
|
||
"rsi_sell": 70,
|
||
"ema_period": 21,
|
||
"sma_short": 20,
|
||
"sma_long": 100,
|
||
"atr_period": 14,
|
||
"atr_multiplier": 1.5,
|
||
"stake_amount": None, # use exchange default
|
||
"stoploss": -0.10,
|
||
"minimal_roi": {"0": 0.10}
|
||
}
|
||
|
||
dca_levels = {
|
||
0: 0.00,
|
||
-2: 0.05,
|
||
-4: 0.07,
|
||
-6: 0.10,
|
||
-8: 0.12,
|
||
-10: 0.15,
|
||
-12: 0.18,
|
||
-14: 0.22,
|
||
-16: 0.26,
|
||
-18: 0.30,
|
||
}
|
||
|
||
allow_decrease_rate = 0.4
|
||
|
||
# ROI table:
|
||
minimal_roi = {
|
||
"0": 10
|
||
}
|
||
|
||
# Stoploss:
|
||
stoploss = -1 # 0.256
|
||
# Custom stoploss
|
||
use_custom_stoploss = False
|
||
|
||
trailing_stop = False
|
||
trailing_stop_positive = 0.15
|
||
trailing_stop_positive_offset = 1
|
||
trailing_only_offset_is_reached = True
|
||
|
||
# Buy hypers
|
||
timeframe = '1m'
|
||
max_open_trades = 5
|
||
max_amount = 40
|
||
|
||
parameters = {}
|
||
# DCA config
|
||
position_adjustment_enable = True
|
||
|
||
columns_logged = False
|
||
pairs = {
|
||
pair: {
|
||
"first_buy": 0,
|
||
"last_buy": 0.0,
|
||
"last_min": 999999999999999.5,
|
||
"last_max": 0,
|
||
"trade_info": {},
|
||
"max_touch": 0.0,
|
||
"last_sell": 0.0,
|
||
'count_of_buys': 0,
|
||
'current_profit': 0,
|
||
'expected_profit': 0,
|
||
'previous_profit': 0,
|
||
"last_candle": {},
|
||
"last_count_of_buys": 0,
|
||
'base_stake_amount': 0,
|
||
'stop_buy': False,
|
||
'last_date': 0,
|
||
'stop': False,
|
||
'max_profit': 0,
|
||
'first_amount': 0,
|
||
'total_amount': 0,
|
||
'has_gain': 0,
|
||
'force_sell': False,
|
||
'force_buy': False,
|
||
'last_ath': 0,
|
||
'dca_thresholds': {}
|
||
}
|
||
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
|
||
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
|
||
}
|
||
trades = list()
|
||
max_profit_pairs = {}
|
||
|
||
btc_ath_history = [
|
||
{"date": "2011-06-09", "price_usd": 26.15, "note": "pic 2011 (early breakout)"},
|
||
{"date": "2013-11-29", "price_usd": 1132.00, "note": "bull run fin 2013"},
|
||
{"date": "2017-12-17", "price_usd": 19783.00, "note": "ATH décembre 2017 (crypto bubble)"},
|
||
{"date": "2020-12-31", "price_usd": 29001.72, "note": "fin 2020, nouveau record après accumulation)"},
|
||
{"date": "2021-11-10", "price_usd": 68742.00, "note": "record novembre 2021 (institutional demand)"},
|
||
{"date": "2024-03-05", "price_usd": 69000.00,
|
||
"note": "nouveau pic début 2024 (source presse, valeur indicative)"},
|
||
{"date": "2025-07-11", "price_usd": 118755.00, "note": "pic juillet 2025 (valeur rapportée par la presse)"},
|
||
{"date": "2025-10-06", "price_usd": 126198.07,
|
||
"note": "pic oct. 2025 (source agrégée, à vérifier selon l'exchange)"}
|
||
]
|
||
|
||
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
|
||
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
|
||
|
||
minutes = 0
|
||
if self.pairs[pair]['last_date'] != 0:
|
||
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
|
||
|
||
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
|
||
last_candle = dataframe.iloc[-1].squeeze()
|
||
last_candle_2 = dataframe.iloc[-2].squeeze()
|
||
last_candle_3 = dataframe.iloc[-3].squeeze()
|
||
|
||
condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
|
||
|
||
allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
|
||
|
||
if allow_to_buy:
|
||
self.trades = list()
|
||
self.pairs[pair]['first_buy'] = rate
|
||
self.pairs[pair]['last_buy'] = rate
|
||
self.pairs[pair]['max_touch'] = last_candle['close']
|
||
self.pairs[pair]['last_candle'] = last_candle
|
||
self.pairs[pair]['count_of_buys'] = 1
|
||
self.pairs[pair]['current_profit'] = 0
|
||
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
|
||
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
|
||
|
||
dispo = round(self.wallets.get_available_stake_amount())
|
||
self.printLineLog()
|
||
|
||
stake_amount = self.adjust_stake_amount(pair, last_candle)
|
||
|
||
self.pairs[pair]['total_amount'] = stake_amount
|
||
self.pairs[pair]['first_amount'] = stake_amount
|
||
|
||
self.calculateStepsDcaThresholds(last_candle, pair)
|
||
|
||
self.log_trade(
|
||
last_candle=last_candle,
|
||
date=current_time,
|
||
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
|
||
pair=pair,
|
||
rate=rate,
|
||
dispo=dispo,
|
||
profit=0,
|
||
trade_type=entry_tag,
|
||
buys=1,
|
||
stake=round(stake_amount, 2)
|
||
)
|
||
|
||
return allow_to_buy
|
||
|
||
def calculateStepsDcaThresholds(self, last_candle, pair):
|
||
# def split_ratio_one_third(n, p):
|
||
# a = n / (2 * p) # première valeur
|
||
# d = n / (p * (p - 1)) # incrément
|
||
# return [round(a + i * d, 3) for i in range(p)]
|
||
def progressive_parts(total, n, first):
|
||
# solve for r
|
||
# S = first * (r^n - 1)/(r - 1) = total
|
||
# numeric solving
|
||
|
||
f = lambda r: first * (r ** n - 1) / (r - 1) - total
|
||
r = mp.findroot(f, 1.05) # initial guess
|
||
|
||
parts = [round(first * (r ** k), 4) for k in range(n)]
|
||
return parts
|
||
|
||
# r, parts = progressive_parts(0.4, 40, 0.004)
|
||
# print("r =", r)
|
||
# print(parts)
|
||
|
||
if self.pairs[pair]['last_ath'] == 0 :
|
||
ath = max(last_candle['mid'], self.get_last_ath_before_candle(last_candle))
|
||
self.pairs[pair]['last_ath'] = ath
|
||
|
||
steps = self.approx_value(last_candle['mid'], self.pairs[pair]['last_ath'])
|
||
self.pairs[pair]['dca_thresholds'] = progressive_parts(
|
||
(last_candle['mid'] - (self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate))) / last_candle['mid'],
|
||
steps, 0.003)
|
||
print(f"val={last_candle['mid']} steps={steps} pct={(last_candle['mid'] - (self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate))) / last_candle['mid']}")
|
||
print(self.pairs[pair]['dca_thresholds'])
|
||
|
||
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
|
||
time_in_force: str,
|
||
exit_reason: str, current_time, **kwargs, ) -> bool:
|
||
|
||
# allow_to_sell = (minutes > 30)
|
||
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
|
||
last_candle = dataframe.iloc[-1].squeeze()
|
||
|
||
minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0))
|
||
profit =trade.calc_profit(rate)
|
||
force = self.pairs[pair]['force_sell']
|
||
allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss')
|
||
|
||
if allow_to_sell:
|
||
self.trades = list()
|
||
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys']
|
||
self.pairs[pair]['last_sell'] = rate
|
||
self.pairs[pair]['last_candle'] = last_candle
|
||
self.pairs[pair]['previous_profit'] = 0
|
||
self.trades = list()
|
||
dispo = round(self.wallets.get_available_stake_amount())
|
||
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
|
||
self.log_trade(
|
||
last_candle=last_candle,
|
||
date=current_time,
|
||
action="🟥Sell " + str(minutes),
|
||
pair=pair,
|
||
trade_type=exit_reason,
|
||
rate=last_candle['close'],
|
||
dispo=dispo,
|
||
profit=round(profit, 2)
|
||
)
|
||
self.pairs[pair]['max_profit'] = 0
|
||
self.pairs[pair]['force_sell'] = False
|
||
self.pairs[pair]['has_gain'] = 0
|
||
self.pairs[pair]['current_profit'] = 0
|
||
self.pairs[pair]['total_amount'] = 0
|
||
self.pairs[pair]['count_of_buys'] = 0
|
||
self.pairs[pair]['max_touch'] = 0
|
||
self.pairs[pair]['last_buy'] = 0
|
||
self.pairs[pair]['last_date'] = current_time
|
||
self.pairs[pair]['current_trade'] = None
|
||
# else:
|
||
# self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked")
|
||
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss')
|
||
|
||
# def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
|
||
#
|
||
# dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
|
||
# last_candle = dataframe.iloc[-1].squeeze()
|
||
# last_candle_1h = dataframe.iloc[-13].squeeze()
|
||
# before_last_candle = dataframe.iloc[-2].squeeze()
|
||
# before_last_candle_2 = dataframe.iloc[-3].squeeze()
|
||
# before_last_candle_12 = dataframe.iloc[-13].squeeze()
|
||
#
|
||
# expected_profit = self.expectedProfit(pair, last_candle)
|
||
# # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
|
||
#
|
||
# max_touch_before = self.pairs[pair]['max_touch']
|
||
# self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
|
||
# self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
|
||
# self.pairs[pair]['current_trade'] = trade
|
||
#
|
||
# count_of_buys = trade.nr_of_successful_entries
|
||
#
|
||
# profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
|
||
# self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
|
||
# max_profit = last_candle['max5'] #self.pairs[pair]['max_profit']
|
||
# baisse = 0
|
||
# if profit > 0:
|
||
# baisse = 1 - (profit / max_profit)
|
||
# mx = max_profit / 5
|
||
# self.pairs[pair]['count_of_buys'] = count_of_buys
|
||
# self.pairs[pair]['current_profit'] = profit
|
||
#
|
||
# dispo = round(self.wallets.get_available_stake_amount())
|
||
# hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
|
||
# days_since_first_buy = (current_time - trade.open_date_utc).days
|
||
# hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
|
||
# minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
|
||
#
|
||
# if minutes % 4 == 0:
|
||
# self.log_trade(
|
||
# last_candle=last_candle,
|
||
# date=current_time,
|
||
# action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
|
||
# dispo=dispo,
|
||
# pair=pair,
|
||
# rate=last_candle['close'],
|
||
# trade_type='',
|
||
# profit=round(profit, 2),
|
||
# buys=count_of_buys,
|
||
# stake=0
|
||
# )
|
||
#
|
||
# if (last_candle['close'] > last_candle['mid']) or (last_candle['sma5_deriv1'] > 0):
|
||
# return None
|
||
#
|
||
# pair_name = self.getShortName(pair)
|
||
#
|
||
# if profit > 0.003 * count_of_buys and baisse > 0.30:
|
||
# self.pairs[pair]['force_sell'] = False
|
||
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
|
||
# return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
|
||
#
|
||
# self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
|
||
|
||
def getShortName(self, pair):
|
||
return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '')
|
||
|
||
def getLastLost(self, last_candle, pair):
|
||
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
|
||
return last_lost
|
||
def getPctFirstBuy(self, pair, last_candle):
|
||
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
|
||
|
||
def getPctLastBuy(self, pair, last_candle):
|
||
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
|
||
|
||
def expectedProfit(self, pair: str, last_candle: DataFrame):
|
||
lim = 0.01
|
||
pct = 0.002
|
||
if (self.getShortName(pair) == 'BTC'):
|
||
lim = 0.005
|
||
pct = 0.001
|
||
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
|
||
expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
|
||
|
||
self.pairs[pair]['expected_profit'] = expected_profit
|
||
|
||
return expected_profit
|
||
|
||
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
|
||
last_candle=None):
|
||
# Afficher les colonnes une seule fois
|
||
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
|
||
return
|
||
if self.columns_logged % 10 == 0:
|
||
self.printLog(
|
||
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
|
||
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_min':>7}|{'Buys':>5}| {'Stake':>5} |"
|
||
f"{'rsi':>6}|{'rsi_1h':>6}|{'rsi_1d':>6}|{'mlprob':>6}" #|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
|
||
)
|
||
self.printLineLog()
|
||
df = pd.DataFrame.from_dict(self.pairs, orient='index')
|
||
colonnes_a_exclure = ['last_candle',
|
||
'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
|
||
df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
|
||
# df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
|
||
|
||
self.printLog(df_filtered)
|
||
|
||
self.columns_logged += 1
|
||
date = str(date)[:16] if date else "-"
|
||
limit = None
|
||
rsi = ''
|
||
rsi_pct = ''
|
||
sma5_1d = ''
|
||
sma5_1h = ''
|
||
|
||
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
|
||
|
||
last_lost = self.getLastLost(last_candle, pair)
|
||
|
||
if buys is None:
|
||
buys = ''
|
||
|
||
max_touch = ''
|
||
pct_max = self.getPctFirstBuy(pair, last_candle)
|
||
|
||
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
|
||
|
||
dist_max = ''
|
||
|
||
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
|
||
self.pairs[pair]['last_max'], 3)
|
||
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
|
||
self.pairs[pair]['last_min'], 3)
|
||
|
||
color = GREEN if profit > 0 else RED
|
||
|
||
profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
|
||
|
||
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère.
|
||
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
|
||
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère.
|
||
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
|
||
self.printLog(
|
||
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
|
||
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
|
||
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
|
||
f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|{round(last_candle['rsi_1h'], 1) or '-' :>6}|{round(last_candle['rsi_1d'], 1) or '-' :>6}|"
|
||
# f"{round(last_candle['rtp_1h'] * 100, 0) or '-' :>6}|{round(last_candle['rtp_1d'] * 100, 0) or '-' :>6}|"
|
||
f"{round(last_candle['ml_prob'], 1) or '-' :>6}|"
|
||
)
|
||
|
||
def printLineLog(self):
|
||
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|"
|
||
self.printLog(
|
||
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
|
||
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
|
||
)
|
||
|
||
def printLog(self, str):
|
||
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
|
||
return;
|
||
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
|
||
logger.info(str)
|
||
else:
|
||
if not self.dp.runmode.value in ('hyperopt'):
|
||
print(str)
|
||
|
||
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
# Add all ta features
|
||
pair = metadata['pair']
|
||
short_pair = self.getShortName(pair)
|
||
self.path = f"user_data/strategies/plots/{short_pair}/" # + ("valide/" if not self.dp.runmode.value in ('backtest') else '')
|
||
|
||
heikinashi = qtpylib.heikinashi(dataframe)
|
||
dataframe['haopen'] = heikinashi['open']
|
||
dataframe['haclose'] = heikinashi['close']
|
||
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
|
||
|
||
dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2
|
||
dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean()
|
||
dataframe['sma5_deriv1'] = 1000 * (dataframe['sma5'] - dataframe['sma5'].shift(1)) / dataframe['sma5'].shift(1)
|
||
|
||
dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean()
|
||
dataframe['sma12_deriv1'] = 1000 * (dataframe['sma12'] - dataframe['sma12'].shift(1)) / dataframe[
|
||
'sma12'].shift(1)
|
||
|
||
dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean()
|
||
dataframe['sma24_deriv1'] = 1000 * (dataframe['sma24'] - dataframe['sma24'].shift(1)) / dataframe['sma24'].shift(1)
|
||
|
||
dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean()
|
||
dataframe['sma60_deriv1'] = 1000 * (dataframe['sma60'] - dataframe['sma60'].shift(1)) / dataframe['sma60'].shift(1)
|
||
|
||
# dataframe[f"sma5_inv"] = (dataframe[f"sma5"].shift(2) >= dataframe[f"sma5"].shift(1)) \
|
||
# & (dataframe[f"sma5"].shift(1) <= dataframe[f"sma5"])
|
||
|
||
dataframe["sma5_sqrt"] = (
|
||
np.sqrt(np.abs(dataframe["sma5"] - dataframe["sma5"].shift(1)))
|
||
+ np.sqrt(np.abs(dataframe["sma5"].shift(3) - dataframe["sma5"].shift(1)))
|
||
)
|
||
dataframe["sma5_inv"] = (
|
||
(dataframe["sma5"].shift(2) >= dataframe["sma5"].shift(1))
|
||
& (dataframe["sma5"].shift(1) <= dataframe["sma5"])
|
||
& (dataframe["sma5_sqrt"] > 5)
|
||
)
|
||
|
||
dataframe["sma12_sqrt"] = (
|
||
np.sqrt(np.abs(dataframe["sma12"] - dataframe["sma12"].shift(1)))
|
||
+ np.sqrt(np.abs(dataframe["sma12"].shift(3) - dataframe["sma12"].shift(1)))
|
||
)
|
||
dataframe["sma12_inv"] = (
|
||
(dataframe["sma12"].shift(2) >= dataframe["sma12"].shift(1))
|
||
& (dataframe["sma12"].shift(1) <= dataframe["sma12"])
|
||
& (dataframe["sma12_sqrt"] > 5)
|
||
)
|
||
|
||
dataframe["percent"] = dataframe['mid'].pct_change()
|
||
dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean()
|
||
dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean()
|
||
dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean()
|
||
|
||
dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14)
|
||
self.calculeDerivees(dataframe, 'rsi', ema_period=12)
|
||
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
|
||
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
|
||
dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5)
|
||
dataframe['min180'] = talib.MIN(dataframe['mid'], timeperiod=180)
|
||
dataframe['max180'] = talib.MAX(dataframe['mid'], timeperiod=180)
|
||
dataframe['pct180'] = ((dataframe["mid"] - dataframe['min180'] ) / (dataframe['max180'] - dataframe['min180'] ))
|
||
dataframe = self.rsi_trend_probability(dataframe, short=60, long=360)
|
||
|
||
# ################### INFORMATIVE 1h
|
||
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
|
||
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
|
||
# Calcul MACD
|
||
macd, macdsignal, macdhist = talib.MACD(
|
||
informative['close'],
|
||
fastperiod=12,
|
||
slowperiod=26,
|
||
signalperiod=9
|
||
)
|
||
informative['macd'] = macd
|
||
informative['macdsignal'] = macdsignal
|
||
informative['macdhist'] = macdhist
|
||
|
||
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14)
|
||
informative['sma24'] = informative['mid'].ewm(span=24, adjust=False).mean()
|
||
informative['sma24_deriv1'] = 1000 * (informative['sma24'] - informative['sma24'].shift(1)) / informative['sma24'].shift(1)
|
||
|
||
informative['sma60'] = informative['mid'].ewm(span=60, adjust=False).mean()
|
||
informative['sma60_deriv1'] = 1000 * (informative['sma60'] - informative['sma60'].shift(1)) / informative['sma60'].shift(1)
|
||
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14)
|
||
self.calculeDerivees(informative, 'rsi', ema_period=12)
|
||
# informative = self.rsi_trend_probability(informative)
|
||
|
||
probas = self.calculModelInformative(informative)
|
||
|
||
# informative = self.populate1hIndicators(df=informative, metadata=metadata)
|
||
# informative = self.calculateRegression(informative, 'mid', lookback=15)
|
||
dataframe = merge_informative_pair(dataframe, informative, '1m', '1h', ffill=True)
|
||
|
||
# ################### INFORMATIVE 1d
|
||
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1d')
|
||
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
|
||
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=5)
|
||
# informative = self.rsi_trend_probability(informative)
|
||
# informative = self.calculateRegression(informative, 'mid', lookback=15)
|
||
dataframe = merge_informative_pair(dataframe, informative, '1m', '1d', ffill=True)
|
||
|
||
dataframe['last_price'] = dataframe['close']
|
||
dataframe['first_price'] = dataframe['close']
|
||
if self.dp:
|
||
if self.dp.runmode.value in ('live', 'dry_run'):
|
||
self.getOpenTrades()
|
||
|
||
for trade in self.trades:
|
||
if trade.pair != pair:
|
||
continue
|
||
filled_buys = trade.select_filled_orders('buy')
|
||
count = 0
|
||
amount = 0
|
||
min_price = 111111111111110;
|
||
max_price = 0;
|
||
for buy in filled_buys:
|
||
if count == 0:
|
||
min_price = min(min_price, buy.price)
|
||
max_price = max(max_price, buy.price)
|
||
dataframe['first_price'] = buy.price
|
||
self.pairs[pair]['first_buy'] = buy.price
|
||
self.pairs[pair]['first_amount'] = buy.price * buy.filled
|
||
# dataframe['close01'] = buy.price * 1.01
|
||
|
||
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
|
||
# status=closed, date=2024-08-26 02:20:11)
|
||
dataframe['last_price'] = buy.price
|
||
self.pairs[pair]['last_buy'] = buy.price
|
||
count = count + 1
|
||
amount += buy.price * buy.filled
|
||
count_buys = count
|
||
self.pairs[pair]['total_amount'] = amount
|
||
|
||
dataframe['absolute_min'] = dataframe['mid'].rolling(1440, min_periods=1).min()
|
||
dataframe['absolute_max'] = dataframe['mid'].rolling(1440, min_periods=1).max()
|
||
# steps = (dataframe['absolute_max'] - dataframe['absolute_min']) / (dataframe['absolute_min'] * 0.01)
|
||
# levels = [dataframe['absolute_min'] * (1 + i / 100) for i in range(1, steps + 1)]
|
||
#
|
||
# print(levels)
|
||
|
||
###########################################################
|
||
# Bollinger Bands
|
||
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
|
||
dataframe['bb_lowerband'] = bollinger['lower']
|
||
dataframe['bb_middleband'] = bollinger['mid']
|
||
dataframe['bb_upperband'] = bollinger['upper']
|
||
dataframe["bb_percent"] = (
|
||
(dataframe["close"] - dataframe["bb_lowerband"]) /
|
||
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
|
||
)
|
||
dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma24"]
|
||
|
||
# Calcul MACD
|
||
macd, macdsignal, macdhist = talib.MACD(
|
||
dataframe['close'],
|
||
fastperiod=12,
|
||
slowperiod=26,
|
||
signalperiod=9
|
||
)
|
||
|
||
# | Nom | Formule / définition | Signification |
|
||
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
|
||
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal d’achat <br> - Croisement du MACD en dessous → signal de vente |
|
||
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance. <br> - Positif et croissant → tendance haussière qui s’accélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui s’accélère <br> - Négatif mais croissant → ralentissement de la baisse |
|
||
|
||
# Ajouter dans le dataframe
|
||
dataframe['macd'] = macd
|
||
dataframe['macdsignal'] = macdsignal
|
||
dataframe['macdhist'] = macdhist
|
||
|
||
# Regarde dans le futur
|
||
# # --- Rendre relatif sur chaque série (-1 → 1) ---
|
||
# for col in ['macd', 'macdsignal', 'macdhist']:
|
||
# series = dataframe[col]
|
||
# valid = series[~np.isnan(series)] # ignorer NaN
|
||
# min_val = valid.min()
|
||
# max_val = valid.max()
|
||
# span = max_val - min_val if max_val != min_val else 1
|
||
# dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1
|
||
#
|
||
# dataframe['tdc_macd'] = self.macd_tendance_int(
|
||
# dataframe,
|
||
# macd_col='macd_rel',
|
||
# signal_col='macdsignal_rel',
|
||
# hist_col='macdhist_rel'
|
||
# )
|
||
|
||
# ------------------------------------------------------------------------------------
|
||
# rolling SMA indicators (used for trend detection too)
|
||
s_short = self.DEFAULT_PARAMS['sma_short']
|
||
s_long = self.DEFAULT_PARAMS['sma_long']
|
||
|
||
dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean()
|
||
dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean()
|
||
|
||
# --- pente brute ---
|
||
dataframe['slope'] = dataframe['sma24'].diff()
|
||
|
||
# --- lissage EMA ---
|
||
dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean()
|
||
|
||
# # RSI
|
||
# window = 14
|
||
# delta = dataframe['close'].diff()
|
||
# up = delta.clip(lower=0)
|
||
# down = -1 * delta.clip(upper=0)
|
||
# ma_up = up.rolling(window=window).mean()
|
||
# ma_down = down.rolling(window=window).mean()
|
||
# rs = ma_up / ma_down.replace(0, 1e-9)
|
||
# dataframe['rsi'] = 100 - (100 / (1 + rs))
|
||
#
|
||
# # EMA example
|
||
# dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean()
|
||
#
|
||
# # ATR (simple implementation)
|
||
# high_low = dataframe['high'] - dataframe['low']
|
||
# high_close = (dataframe['high'] - dataframe['close'].shift()).abs()
|
||
# low_close = (dataframe['low'] - dataframe['close'].shift()).abs()
|
||
# tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1)
|
||
# dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean()
|
||
|
||
###########################
|
||
# df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume
|
||
# Assure-toi qu'il est trié par date croissante
|
||
timeframe = self.timeframe
|
||
# --- Volatilité normalisée ---
|
||
dataframe['atr'] = ta.volatility.AverageTrueRange(
|
||
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
|
||
).average_true_range()
|
||
dataframe['atr_norm'] = dataframe['atr'] / dataframe['close']
|
||
|
||
# --- Force de tendance ---
|
||
dataframe['adx'] = ta.trend.ADXIndicator(
|
||
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
|
||
).adx()
|
||
|
||
# --- Volume directionnel (On Balance Volume) ---
|
||
dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(
|
||
close=dataframe['close'], volume=dataframe['volume']
|
||
).on_balance_volume()
|
||
self.calculeDerivees(dataframe, 'obv', ema_period=1)
|
||
|
||
dataframe['obv12'] = ta.volume.OnBalanceVolumeIndicator(
|
||
close=dataframe['sma12'], volume=dataframe['volume'].rolling(12).sum()
|
||
).on_balance_volume()
|
||
|
||
dataframe['obv24'] = ta.volume.OnBalanceVolumeIndicator(
|
||
close=dataframe['sma24'], volume=dataframe['volume'].rolling(24).sum()
|
||
).on_balance_volume()
|
||
|
||
# self.calculeDerivees(dataframe, 'obv5', ema_period=5)
|
||
|
||
# --- Volatilité récente (écart-type des rendements) ---
|
||
dataframe['vol_24'] = dataframe['percent'].rolling(24).std()
|
||
|
||
# Compter les baisses / hausses consécutives
|
||
# self.calculateDownAndUp(dataframe, limit=0.0001)
|
||
|
||
# df : ton dataframe OHLCV + indicateurs existants
|
||
# Assurez-vous que les colonnes suivantes existent :
|
||
# 'max_rsi_12', 'roc_24', 'bb_percent_1h'
|
||
|
||
# --- Filtrage des NaN initiaux ---
|
||
# dataframe = dataframe.dropna()
|
||
|
||
dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI
|
||
dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance
|
||
dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width']
|
||
|
||
dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3)
|
||
dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9)
|
||
dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0
|
||
|
||
###########################################################
|
||
# print(f"min={dataframe['absolute_min'].min()} max={dataframe['absolute_max'].max()}")
|
||
for i in [0, 1, 2, 3]:
|
||
dataframe[f"lvl_{i}_pct"] = dataframe['absolute_min'] * (1 + 0.01 * i)
|
||
|
||
self.model_indicators = self.listUsableColumns(dataframe)
|
||
|
||
if False and self.dp.runmode.value in ('backtest'):
|
||
self.trainModel(dataframe, metadata)
|
||
|
||
short_pair = self.getShortName(pair)
|
||
# path=f"user_data/strategies/plots/{short_pair}/"
|
||
|
||
self.model = joblib.load(f"{self.path}/{short_pair}_rf_model.pkl")
|
||
|
||
# Préparer les features pour la prédiction
|
||
features = dataframe[self.model_indicators].fillna(0)
|
||
|
||
# Prédiction : probabilité que le prix monte
|
||
|
||
# Affichage des colonnes intérressantes dans le model
|
||
features_pruned, kept_features = self.prune_features(
|
||
model=self.model,
|
||
dataframe=dataframe,
|
||
feature_columns=self.model_indicators,
|
||
importance_threshold=0.005 # enlever features < % importance
|
||
)
|
||
|
||
probs = self.model.predict_proba(features)[:, 1]
|
||
|
||
# Sauvegarder la probabilité pour l’analyse
|
||
dataframe['ml_prob'] = probs
|
||
|
||
if False and self.dp.runmode.value in ('backtest'):
|
||
self.inspect_model(self.model)
|
||
|
||
#
|
||
# absolute_min = dataframe['absolute_min'].min()
|
||
# absolute_max = dataframe['absolute_max'].max()
|
||
#
|
||
# # Écart total
|
||
# diff = absolute_max - absolute_min
|
||
#
|
||
# # Nombre de lignes intermédiaires (1% steps)
|
||
# steps = int((absolute_max - absolute_min) / (absolute_min * 0.01))
|
||
#
|
||
# # Niveaux de prix à 1%, 2%, ..., steps%
|
||
# levels = [absolute_min * (1 + i / 100) for i in range(1, steps + 1)]
|
||
# levels = [lvl for lvl in levels if lvl < absolute_max] # évite le dernier niveau exact
|
||
#
|
||
# # ajout dans le DataFrame
|
||
# for i, lvl in enumerate(levels, start=1):
|
||
# dataframe[f"lvl_{i}_pct"] = lvl
|
||
|
||
# # Indices correspondants
|
||
# indices = [(dataframe['mid'] - lvl).abs().idxmin() for lvl in levels]
|
||
|
||
# Non utilisé dans le modèle
|
||
dataframe['min60'] = talib.MIN(dataframe['mid'], timeperiod=60)
|
||
|
||
# val = 90000
|
||
# steps = 12
|
||
# [0.018, 0.022, 0.025, 0.028, 0.032, 0.035, 0.038, 0.042, 0.045, 0.048, 0.052, 0.055]
|
||
|
||
# val = 100000
|
||
# steps = 20
|
||
# [0.012, 0.014, 0.015, 0.016, 0.018, 0.019, 0.02, 0.022, 0.023, 0.024, 0.025, 0.027, 0.028, 0.029, 0.031, 0.032,
|
||
# 0.033, 0.035, 0.036, 0.037]
|
||
|
||
# val = 110000
|
||
# steps = 28
|
||
# [0.01, 0.01, 0.011, 0.012, 0.013, 0.013, 0.014, 0.015, 0.015, 0.016, 0.017, 0.018, 0.018, 0.019, 0.02, 0.02,
|
||
# 0.021, 0.022, 0.023, 0.023, 0.024, 0.025, 0.025, 0.026, 0.027, 0.028, 0.028, 0.029]
|
||
|
||
# val = 120000
|
||
# steps = 35
|
||
# [0.008, 0.009, 0.009, 0.01, 0.01, 0.011, 0.011, 0.012, 0.012, 0.013, 0.013, 0.014, 0.014, 0.015, 0.015, 0.016,
|
||
# 0.016, 0.017, 0.017, 0.018, 0.018, 0.019, 0.019, 0.019, 0.02, 0.02, 0.021, 0.021, 0.022, 0.022, 0.023, 0.023,
|
||
# 0.024, 0.024, 0.025]
|
||
|
||
# def split_ratio_one_third(n, p):
|
||
# a = n / (2 * p) # première valeur
|
||
# d = n / (p * (p - 1)) # incrément
|
||
# return [round(a + i * d, 3) for i in range(p)]
|
||
#
|
||
# for val in range(90000, 130000, 10000):
|
||
# steps = self.approx_value(val, 126000)
|
||
# print(f"val={val} steps={steps} pct={(val - (126000 * (1 - self.allow_decrease_rate))) / val}")
|
||
# dca = split_ratio_one_third((val - (126000 * (1 - self.allow_decrease_rate))) / 126000, steps)
|
||
# print(dca)
|
||
|
||
return dataframe
|
||
|
||
def getOpenTrades(self):
|
||
# if len(self.trades) == 0:
|
||
self.trades = Trade.get_open_trades()
|
||
return self.trades
|
||
|
||
# def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
# dataframe.loc[
|
||
# (
|
||
# # (dataframe['sma5_inv'] == 1)
|
||
# (
|
||
# (dataframe['pct180'] < 0.5) |
|
||
# (
|
||
# (dataframe['close'] < dataframe['sma60'] )
|
||
# & (dataframe['sma24_deriv1'] > 0)
|
||
# )
|
||
# )
|
||
# # & (dataframe['hapercent'] > 0)
|
||
# # & (dataframe['sma24_deriv1'] > - 0.03)
|
||
# & (dataframe['ml_prob'] > 0.1)
|
||
# # & (
|
||
# # (dataframe['percent3'] <= -0.003)
|
||
# # | (dataframe['percent12'] <= -0.003)
|
||
# # | (dataframe['percent24'] <= -0.003)
|
||
# # )
|
||
# ), ['enter_long', 'enter_tag']] = (1, f"future")
|
||
#
|
||
# dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
|
||
#
|
||
# return dataframe
|
||
|
||
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
"""
|
||
Buy when the model predicts a high upside probability/value.
|
||
This method loads the ML model, generates predictions, and
|
||
triggers a buy if the predicted value exceeds a learned threshold.
|
||
"""
|
||
|
||
# # Ensure prediction column exists
|
||
# if "ml_prediction" not in dataframe.columns:
|
||
# # Generate predictions on the fly
|
||
# # (your model must already be loaded in self.model)
|
||
# features = self.ml_features # list of feature column names
|
||
# dataframe["ml_prediction"] = self.model.predict(dataframe[features].fillna(0))
|
||
|
||
# Choose threshold automatically based on training statistics
|
||
# or a fixed value discovered by SHAP / PDP
|
||
# threshold = 0.4 #self.buy_threshold # ex: 0.80 or 1.10 depending on your model
|
||
|
||
# 20% des signaux les plus forts
|
||
threshold = np.percentile(dataframe["ml_prob"], 80)
|
||
|
||
# Buy = prediction > threshold
|
||
dataframe["buy"] = 0
|
||
dataframe.loc[
|
||
(dataframe["ml_prob"].shift(1) < dataframe["ml_prob"])
|
||
& (dataframe['sma24_deriv1'] > 0)
|
||
& (dataframe['sma12_deriv1'] > 0)
|
||
& (dataframe['open'] < dataframe['max180'] * 0.997),
|
||
# & (dataframe['min180'].shift(3) == dataframe['min180']),
|
||
['enter_long', 'enter_tag']
|
||
] = (1, f"future")
|
||
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
|
||
|
||
return dataframe
|
||
|
||
# def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
# """
|
||
# Populate buy signals based on SHAP/PDP insights:
|
||
# - strong momentum: macdhist high and macd > macdsignal
|
||
# - rsi elevated (but not extreme)
|
||
# - positive sma24 derivative above threshold
|
||
# - price above sma60 (trend context)
|
||
# - price in upper region of Bollinger (bb_percent high)
|
||
# - volume/obv filter and volatility guard (obv_dist, atr)
|
||
# Returns dataframe with column 'buy' (1 = buy signal).
|
||
# """
|
||
#
|
||
# # Ensure column existence (fallback to zeros if missing)
|
||
# cols = [
|
||
# "macdhist", "macd", "macdsignal", "rsi", "rsi_short",
|
||
# "sma24_deriv1", "sma60", "bb_percent",
|
||
# "obv_dist", "atr", "percent", "open_1h", "absolute_min"
|
||
# ]
|
||
# for c in cols:
|
||
# if c not in dataframe.columns:
|
||
# dataframe[c] = 0.0
|
||
#
|
||
# # Thresholds (tune these)
|
||
# TH_MACDHIST = 8.0 # macdhist considered "strong" (example)
|
||
# TH_MACD_POS = 0.0 # macd must be > 0 (positive momentum)
|
||
# TH_SMA24_DERIV = 0.05 # sma24 derivative threshold where effect appears
|
||
# TH_RSI_LOW = 52.0 # lower bound to consider bullish RSI
|
||
# TH_RSI_HIGH = 85.0 # upper bound to avoid extreme overbought (optional)
|
||
# TH_BB_PERCENT = 0.7 # in upper band (0..1)
|
||
# TH_OBV_DIST = -40.0 # accept small negative OBV distance, reject very negative
|
||
# MAX_ATR = None # optional: maximum ATR to avoid extreme volatility (None = off)
|
||
# MIN_PRICE_ABOVE_SMA60 = 0.0 # require price > sma60 (price - sma60 > 0)
|
||
#
|
||
# price = dataframe["close"]
|
||
#
|
||
# # Momentum conditions
|
||
# cond_macdhist = dataframe["macdhist"] >= TH_MACDHIST
|
||
# cond_macd_pos = dataframe["macd"] > TH_MACD_POS
|
||
# cond_macd_vs_signal = dataframe["macd"] > dataframe["macdsignal"]
|
||
#
|
||
# # RSI condition (accept moderate-high RSI)
|
||
# cond_rsi = (dataframe["rsi"] >= TH_RSI_LOW) & (dataframe["rsi"] <= TH_RSI_HIGH)
|
||
#
|
||
# # SMA24 derivative: require momentum above threshold
|
||
# cond_sma24 = dataframe["sma24_deriv1"] >= TH_SMA24_DERIV
|
||
#
|
||
# # Price above SMA60 (trend filter)
|
||
# cond_above_sma60 = (price - dataframe["sma60"]) > MIN_PRICE_ABOVE_SMA60
|
||
#
|
||
# # Bollinger band percent (price in upper region)
|
||
# cond_bb = dataframe["bb_percent"] >= TH_BB_PERCENT
|
||
#
|
||
# # Volume/OBV prudence filter
|
||
# cond_obv = dataframe["obv_dist"] >= TH_OBV_DIST
|
||
#
|
||
# # Optional ATR guard
|
||
# if MAX_ATR is not None:
|
||
# cond_atr = dataframe["atr"] <= MAX_ATR
|
||
# else:
|
||
# cond_atr = np.ones_like(dataframe["atr"], dtype=bool)
|
||
#
|
||
# # Optional additional guards (avoid tiny percent moves or weird opens)
|
||
# cond_percent = np.abs(dataframe["percent"]) > 0.0005 # ignore almost-no-move bars
|
||
# cond_open = True # keep as placeholder; you can add open_1h relative checks
|
||
#
|
||
# # Combine into a buy signal
|
||
# buy_condition = (
|
||
# cond_macdhist &
|
||
# cond_macd_pos &
|
||
# cond_macd_vs_signal &
|
||
# cond_rsi &
|
||
# cond_sma24 &
|
||
# cond_above_sma60 &
|
||
# cond_bb &
|
||
# cond_obv &
|
||
# cond_atr &
|
||
# cond_percent
|
||
# )
|
||
#
|
||
# # Finalize: set buy column (0/1)
|
||
# dataframe.loc[buy_condition, ['enter_long', 'enter_tag']] = (1, f"future")
|
||
# # dataframe.loc[~buy_condition, "buy"] = 0
|
||
#
|
||
# dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
|
||
#
|
||
# return dataframe
|
||
|
||
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||
|
||
return dataframe
|
||
|
||
# def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
|
||
# # Calculer le minimum des 14 derniers jours
|
||
# nb_pairs = len(self.dp.current_whitelist())
|
||
#
|
||
# base_stake_amount = self.config.get('stake_amount')
|
||
#
|
||
# if True : #self.pairs[pair]['count_of_buys'] == 0:
|
||
# factor = 1 #65 / min(65, last_candle['rsi_1d'])
|
||
# # if last_candle['min_max_60'] > 0.04:
|
||
# # factor = 2
|
||
#
|
||
# adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor)
|
||
# else:
|
||
# adjusted_stake_amount = self.pairs[pair]['first_amount']
|
||
#
|
||
# if self.pairs[pair]['count_of_buys'] == 0:
|
||
# self.pairs[pair]['first_amount'] = adjusted_stake_amount
|
||
#
|
||
# return adjusted_stake_amount
|
||
|
||
def approx_value(self, x, X_max):
|
||
X_min = X_max * (1 - self.allow_decrease_rate) # 126198 * 0.4 = 75718,8
|
||
Y_min = 1
|
||
Y_max = 40
|
||
a = (Y_max - Y_min) / (X_max - X_min) # 39 ÷ (126198 − 126198×0,6) = 0,000772595
|
||
b = Y_min - a * X_min # 1 − (0,000772595 × 75718,8) = −38
|
||
y = a * x + b # 0,000772595 * 115000 - 38
|
||
return max(round(y), 1) # évite les valeurs négatives
|
||
|
||
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
|
||
|
||
if self.pairs[pair]['first_amount'] > 0:
|
||
return self.pairs[pair]['first_amount']
|
||
|
||
ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle))
|
||
self.pairs[pair]['last_ath'] = ath
|
||
|
||
ath_dist = 100 * (ath - last_candle["mid"]) / ath
|
||
|
||
|
||
# ath_dist
|
||
# 0 ==> 1
|
||
# 20 ==> 1.5
|
||
# 40 ==> 2
|
||
# 50 * (1 + (ath_dist / 40))
|
||
|
||
full = self.wallets.get_total_stake_amount()
|
||
steps = self.approx_value(last_candle['mid'], ath)
|
||
base_stake = full / steps
|
||
|
||
# base_stake = stake * (1 + (ath_dist / 40))
|
||
|
||
# Calcule max/min 180
|
||
low180 = last_candle["min180"]
|
||
high180 = last_candle["max180"]
|
||
|
||
mult = 1 - ((last_candle["mid"] - low180) / (high180 - low180))
|
||
|
||
print(f"low={low180} mid={last_candle['mid']} high={high180} mult={mult} ath={ath} ath_dist={round(ath_dist, 2)}" )
|
||
# base_size = montant de base que tu veux utiliser (ex: stake_amount ou autre)
|
||
base_size = base_stake # exemple fraction du portefeuille; adapte selon ton code
|
||
# new stake proportionnel à mult
|
||
new_stake = base_size #* mult
|
||
return new_stake
|
||
|
||
def adjust_trade_position(self, trade: Trade, current_time: datetime,
|
||
current_rate: float, current_profit: float, min_stake: float,
|
||
max_stake: float, **kwargs):
|
||
# ne rien faire si ordre deja en cours
|
||
if trade.has_open_orders:
|
||
# self.printLog("skip open orders")
|
||
return None
|
||
|
||
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
|
||
last_candle = dataframe.iloc[-1].squeeze()
|
||
before_last_candle = dataframe.iloc[-2].squeeze()
|
||
# prépare les données
|
||
current_time = current_time.astimezone(timezone.utc)
|
||
open_date = trade.open_date.astimezone(timezone.utc)
|
||
dispo = round(self.wallets.get_available_stake_amount())
|
||
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
|
||
days_since_first_buy = (current_time - trade.open_date_utc).days
|
||
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
|
||
minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
|
||
|
||
count_of_buys = trade.nr_of_successful_entries
|
||
current_time_utc = current_time.astimezone(timezone.utc)
|
||
open_date = trade.open_date.astimezone(timezone.utc)
|
||
days_since_open = (current_time_utc - open_date).days
|
||
pair = trade.pair
|
||
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
|
||
last_lost = self.getLastLost(last_candle, pair)
|
||
pct_first = 0
|
||
|
||
total_counts = sum(
|
||
pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC')
|
||
|
||
if self.pairs[pair]['first_buy']:
|
||
pct_first = self.getPctFirstBuy(pair, last_candle)
|
||
|
||
if profit > - self.pairs[pair]['first_amount'] and count_of_buys > 15 and last_candle['sma24_deriv1_1h'] < 0:
|
||
stake_amount = trade.stake_amount
|
||
self.pairs[pair]['previous_profit'] = profit
|
||
trade_type = "Sell " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
|
||
self.pairs[trade.pair]['count_of_buys'] += 1
|
||
self.pairs[pair]['total_amount'] = stake_amount
|
||
self.log_trade(
|
||
last_candle=last_candle,
|
||
date=current_time,
|
||
action="🟧 Sell +",
|
||
dispo=dispo,
|
||
pair=trade.pair,
|
||
rate=current_rate,
|
||
trade_type=trade_type,
|
||
profit=round(profit, 1),
|
||
buys=trade.nr_of_successful_entries + 1,
|
||
stake=round(stake_amount, 2)
|
||
)
|
||
|
||
self.pairs[trade.pair]['last_buy'] = current_rate
|
||
self.pairs[trade.pair]['max_touch'] = last_candle['close']
|
||
self.pairs[trade.pair]['last_candle'] = last_candle
|
||
|
||
return -stake_amount
|
||
|
||
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
|
||
return 0
|
||
|
||
lim = 0.3
|
||
if (len(dataframe) < 1):
|
||
# self.printLog("skip dataframe")
|
||
return None
|
||
|
||
# Dernier prix d'achat réel (pas le prix moyen)
|
||
last_fill_price = self.pairs[trade.pair]['last_buy'] #trade.open_rate # remplacé juste après ↓
|
||
|
||
# if len(trade.orders) > 0:
|
||
# # On cherche le dernier BUY exécuté
|
||
# buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"]
|
||
# if buy_orders:
|
||
# last_fill_price = buy_orders[-1].price
|
||
|
||
# baisse relative
|
||
if minutes % 60 == 0:
|
||
ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle))
|
||
self.pairs[pair]['last_ath'] = ath
|
||
else:
|
||
ath = self.pairs[pair]['last_ath']
|
||
|
||
# steps = self.approx_value(last_candle['mid'], ath)
|
||
|
||
# dca_thresholds = split_ratio_one_third((last_candle['mid'] - (ath * self.allow_decrease_rate)) / last_candle['mid'], steps) #((last_candle['mid'] - (ath * self.allow_decrease_rate)) / steps) / last_candle['mid'] # 0.0025 + 0.0005 * count_of_buys
|
||
if len(self.pairs[pair]['dca_thresholds']) == 0:
|
||
self.calculateStepsDcaThresholds(last_candle, pair)
|
||
|
||
dca_threshold = self.pairs[pair]['dca_thresholds'][min(count_of_buys - 1, len(self.pairs[pair]['dca_thresholds']) - 1)]
|
||
|
||
# print(f"{count_of_buys} {ath * (1 - self.allow_decrease_rate)} {round(last_candle['mid'], 2)} {round((last_candle['mid'] - (ath * self.allow_decrease_rate)) / last_candle['mid'], 2)} {steps} {round(dca_threshold, 4)}")
|
||
decline = (last_fill_price - current_rate) / last_fill_price
|
||
increase = - decline
|
||
|
||
# if decline >= self.dca_threshold:
|
||
# # Exemple : on achète 50% du montant du dernier trade
|
||
# last_amount = buy_orders[-1].amount if buy_orders else 0
|
||
# stake_amount = last_amount * current_rate * 0.5
|
||
# return stake_amount
|
||
|
||
|
||
########################### ALGO ATH
|
||
# # --- 1. Calcul ATH local de la paire ---
|
||
# ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle))
|
||
#
|
||
# # --- 2. Distance ATH - current ---
|
||
# dd = (current_rate - ath) / ath * 100 # dd <= 0
|
||
#
|
||
# if dd > -1: # pas de renfort si drawdown trop faible
|
||
# return None
|
||
#
|
||
# # --- 3. DCA dynamique (modèle exponentiel) ---
|
||
# a = 0.015
|
||
# b = 0.12
|
||
#
|
||
# pct = a * (math.exp(b * (-dd)) - 1) # proportion du wallet libre
|
||
#
|
||
# # Clamp de sécurité
|
||
# pct = min(max(pct, 0), 0.35) # max 35% d’un coup
|
||
#
|
||
# if pct <= 0:
|
||
# return None
|
||
#
|
||
# # --- 4. Stake en fonction du wallet libre ---
|
||
# stake_amount = self.wallets.get_available_stake_amount() * pct
|
||
#
|
||
# if stake_amount < self.min_stake_amount:
|
||
# return None
|
||
|
||
# FIN ########################## ALGO ATH
|
||
force = hours > 24 and last_candle['sma60_deriv1_1h'] > 0
|
||
condition = last_candle['percent'] > 0 and last_candle['sma24_deriv1'] > 0 \
|
||
and last_candle['close'] < self.pairs[pair]['first_buy']
|
||
# and last_candle['ml_prob'] > 0.65
|
||
|
||
limit_buy = 40
|
||
# or (last_candle['close'] <= last_candle['min180'] and hours > 3)
|
||
if ((force or decline >= dca_threshold) and condition):
|
||
try:
|
||
if self.pairs[pair]['has_gain'] and profit > 0:
|
||
self.pairs[pair]['force_sell'] = True
|
||
self.pairs[pair]['previous_profit'] = profit
|
||
return None
|
||
|
||
stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))
|
||
# print(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}")
|
||
if stake_amount > 0:
|
||
self.pairs[pair]['previous_profit'] = profit
|
||
trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
|
||
self.pairs[trade.pair]['count_of_buys'] += 1
|
||
self.pairs[pair]['total_amount'] += stake_amount
|
||
self.log_trade(
|
||
last_candle=last_candle,
|
||
date=current_time,
|
||
action="🟧 " + ("Force" if force else 'Loss -'),
|
||
dispo=dispo,
|
||
pair=trade.pair,
|
||
rate=current_rate,
|
||
trade_type=trade_type,
|
||
profit=round(profit, 1),
|
||
buys=trade.nr_of_successful_entries + 1,
|
||
stake=round(stake_amount, 2)
|
||
)
|
||
|
||
self.pairs[trade.pair]['last_buy'] = current_rate
|
||
self.pairs[trade.pair]['max_touch'] = last_candle['close']
|
||
self.pairs[trade.pair]['last_candle'] = last_candle
|
||
|
||
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
|
||
# colonnes_a_exclure = ['last_candle', 'stop',
|
||
# 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
|
||
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
|
||
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
|
||
#
|
||
# self.printLog(df_filtered)
|
||
|
||
return stake_amount
|
||
return None
|
||
except Exception as exception:
|
||
self.printLog(exception)
|
||
return None
|
||
|
||
if current_profit > dca_threshold and (increase >= dca_threshold and self.wallets.get_available_stake_amount() > 0)\
|
||
and last_candle['rsi'] < 75:
|
||
try:
|
||
self.pairs[pair]['previous_profit'] = profit
|
||
stake_amount = max(20, min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle)))
|
||
if stake_amount > 0:
|
||
self.pairs[pair]['has_gain'] += 1
|
||
|
||
trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
|
||
self.pairs[trade.pair]['count_of_buys'] += 1
|
||
self.pairs[pair]['total_amount'] += stake_amount
|
||
self.log_trade(
|
||
last_candle=last_candle,
|
||
date=current_time,
|
||
action="🟡 Gain +",
|
||
dispo=dispo,
|
||
pair=trade.pair,
|
||
rate=current_rate,
|
||
trade_type='Gain',
|
||
profit=round(profit, 1),
|
||
buys=trade.nr_of_successful_entries + 1,
|
||
stake=round(stake_amount, 2)
|
||
)
|
||
self.pairs[trade.pair]['last_buy'] = current_rate
|
||
self.pairs[trade.pair]['max_touch'] = last_candle['close']
|
||
self.pairs[trade.pair]['last_candle'] = last_candle
|
||
return stake_amount
|
||
return None
|
||
except Exception as exception:
|
||
self.printLog(exception)
|
||
return None
|
||
|
||
return None
|
||
|
||
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
|
||
|
||
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
|
||
last_candle = dataframe.iloc[-1].squeeze()
|
||
last_candle_1h = dataframe.iloc[-13].squeeze()
|
||
before_last_candle = dataframe.iloc[-2].squeeze()
|
||
before_last_candle_2 = dataframe.iloc[-3].squeeze()
|
||
before_last_candle_12 = dataframe.iloc[-13].squeeze()
|
||
|
||
expected_profit = self.expectedProfit(pair, last_candle)
|
||
# print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
|
||
|
||
# ----- 1) Charger les variables de trailing pour ce trade -----
|
||
max_price = self.pairs[pair]['max_touch']
|
||
|
||
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
|
||
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
|
||
self.pairs[pair]['current_trade'] = trade
|
||
|
||
count_of_buys = trade.nr_of_successful_entries
|
||
|
||
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
|
||
|
||
if current_profit > 0:
|
||
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
|
||
# else:
|
||
# self.pairs[pair]['max_profit'] = 0
|
||
|
||
max_profit = self.pairs[pair]['max_profit']
|
||
|
||
# if current_profit > 0:
|
||
# print(f"profit={profit} max_profit={max_profit} current_profit={current_profit}")
|
||
if profit < 0:
|
||
return None
|
||
|
||
baisse = 0
|
||
if profit > 0:
|
||
baisse = 1 - (profit / max_profit)
|
||
mx = max_profit / 5
|
||
self.pairs[pair]['count_of_buys'] = count_of_buys
|
||
self.pairs[pair]['current_profit'] = profit
|
||
|
||
# dispo = round(self.wallets.get_available_stake_amount())
|
||
# hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
|
||
# days_since_first_buy = (current_time - trade.open_date_utc).days
|
||
# hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
|
||
# minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
|
||
|
||
# ----- 2) Mise à jour du max_price -----
|
||
self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
|
||
|
||
# ----- 3) Calcul du profit max atteint -----
|
||
# profit_max = (max_price - trade.open_rate) / trade.open_rate
|
||
|
||
current_trailing_stop_positive = self.trailing_stop_positive
|
||
current_trailing_only_offset_is_reached = self.trailing_only_offset_is_reached
|
||
current_trailing_stop_positive_offset = self.trailing_stop_positive_offset
|
||
|
||
max_ = last_candle['max180']
|
||
min_ = last_candle['min180']
|
||
mid = last_candle['mid']
|
||
# éviter division par zéro
|
||
position = (mid - min_) / (max_ - min_)
|
||
zone = int(position * 3) # 0 à 2
|
||
|
||
# if zone == 0:
|
||
# current_trailing_stop_positive = self.trailing_stop_positive
|
||
# current_trailing_stop_positive_offset = self.trailing_stop_positive_offset * 2
|
||
# if minutes > 1440:
|
||
# current_trailing_only_offset_is_reached = False
|
||
# current_trailing_stop_positive_offset = self.trailing_stop_positive_offset
|
||
# if zone == 1:
|
||
|
||
# ----- 5) Calcul du trailing stop dynamique -----
|
||
# Exemple : offset=0.321 => stop à +24.8%
|
||
|
||
trailing_stop = max_profit * (1.0 - current_trailing_stop_positive)
|
||
baisse = 0
|
||
if max_profit:
|
||
baisse = (max_profit - profit) / max_profit
|
||
|
||
# if minutes % 12 == 0:
|
||
# self.log_trade(
|
||
# last_candle=last_candle,
|
||
# date=current_time,
|
||
# action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
|
||
# dispo=dispo,
|
||
# pair=pair,
|
||
# rate=last_candle['close'],
|
||
# trade_type=f"{round(profit, 2)} {round(max_profit, 2)} {round(trailing_stop,2)} {minutes}",
|
||
# profit=round(profit, 2),
|
||
# buys=count_of_buys,
|
||
# stake=0
|
||
# )
|
||
|
||
if last_candle['sma12'] > last_candle['sma24']:
|
||
return None
|
||
# if last_candle['sma24_deriv1'] > 0 : #and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15:
|
||
# if (minutes < 180):
|
||
# return None
|
||
# if (minutes > 1440 and last_candle['sma60_deriv1'] > 0) :
|
||
# return None
|
||
|
||
|
||
# ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? -----
|
||
if current_trailing_only_offset_is_reached:
|
||
# Max profit pas atteint ET perte < 2 * current_trailing_stop_positive
|
||
if max_profit < min(2, max_profit * current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain'])): #2 * current_trailing_stop_positive:
|
||
print(f"{current_time} trailing non atteint trailing_stop={round(trailing_stop,4)} profit={round(profit, 4)} max={round(max_profit, 4)} "
|
||
f"{min(2, current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain']))}")
|
||
return None # ne pas activer le trailing encore
|
||
else:
|
||
print(f"{current_time} trailing atteint trailing_stop={round(trailing_stop,4)} profit={round(profit, 4)} max={round(max_profit, 4)} "
|
||
f"{min(2, current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain']))}")
|
||
|
||
# Sinon : trailing actif dès le début
|
||
|
||
# ----- 6) Condition de vente -----
|
||
if 0 < profit <= trailing_stop and last_candle['mid'] < last_candle['sma5']:
|
||
self.pairs[pair]['force_buy'] = True
|
||
print(
|
||
f"{current_time} Condition de vente trailing_stop={round(trailing_stop,4)} profit={round(profit, 4)} max={round(max_profit, 4)} "
|
||
f"{round(min(2, current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain'])), 4)}")
|
||
|
||
return f"stop_{count_of_buys}_{self.pairs[pair]['has_gain']}"
|
||
return None
|
||
|
||
def informative_pairs(self):
|
||
# get access to all pairs available in whitelist.
|
||
pairs = self.dp.current_whitelist()
|
||
informative_pairs = [(pair, '1h') for pair in pairs]
|
||
informative_pairs += [(pair, '1d') for pair in pairs]
|
||
|
||
return informative_pairs
|
||
|
||
def populate1hIndicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
|
||
|
||
# --- WEEKLY LEVELS ---
|
||
# semaine précédente = semaine ISO différente
|
||
df["week"] = df.index.isocalendar().week
|
||
df["year"] = df.index.year
|
||
|
||
df["weekly_low"] = (
|
||
df.groupby(["year", "week"])["low"]
|
||
.transform("min")
|
||
.shift(1) # décalé -> pas regarder la semaine en cours
|
||
)
|
||
df["weekly_high"] = (
|
||
df.groupby(["year", "week"])["high"]
|
||
.transform("max")
|
||
.shift(1)
|
||
)
|
||
|
||
# Définition simple d'une zone de demande hebdo :
|
||
# bas + 25% de la bougie => modifiable
|
||
df["weekly_demand_zone_low"] = df["weekly_low"]
|
||
df["weekly_demand_zone_high"] = df["weekly_low"] * 1.025
|
||
|
||
# --- MONTHLY LEVELS ---
|
||
df["month"] = df.index.month
|
||
|
||
df["monthly_low"] = (
|
||
df.groupby(["year", "month"])["low"]
|
||
.transform("min")
|
||
.shift(1) # mois précédent uniquement
|
||
)
|
||
df["monthly_high"] = (
|
||
df.groupby(["year", "month"])["high"]
|
||
.transform("max")
|
||
.shift(1)
|
||
)
|
||
|
||
df["monthly_demand_zone_low"] = df["monthly_low"]
|
||
df["monthly_demand_zone_high"] = df["monthly_low"] * 1.03
|
||
|
||
return df
|
||
|
||
# ----- SIGNALS SIMPLES POUR EXEMPLE -----
|
||
|
||
# def populate_buy_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
|
||
# df["buy"] = 0
|
||
#
|
||
# # Exemple : acheter si le prix tape la zone de demande hebdomadaire
|
||
# df.loc[
|
||
# (df["close"] <= df["weekly_demand_zone_high"]) &
|
||
# (df["close"] >= df["weekly_demand_zone_low"]),
|
||
# "buy"
|
||
# ] = 1
|
||
#
|
||
# return df
|
||
#
|
||
# def populate_sell_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
|
||
# df["sell"] = 0
|
||
#
|
||
# # Exemple : vendre sur retour au weekly_high précédent
|
||
# df.loc[df["close"] >= df["weekly_high"], "sell"] = 1
|
||
#
|
||
# return df
|
||
|
||
|
||
def rsi_trend_probability(self, dataframe, short=6, long=12):
|
||
dataframe = dataframe.copy()
|
||
|
||
dataframe['rsi_short'] = talib.RSI(dataframe['mid'], short)
|
||
dataframe['rsi_long'] = talib.RSI(dataframe['mid'], long)
|
||
|
||
dataframe['cross_soft'] = np.tanh((dataframe['rsi_short'] - dataframe['rsi_long']) / 7)
|
||
|
||
dataframe['gap'] = (dataframe['rsi_short'] - dataframe['rsi_long']) / 100
|
||
dataframe['trend'] = (dataframe['rsi_long'] - 50) / 50
|
||
|
||
dataframe['rtp'] = (
|
||
0.6 * dataframe['cross_soft'] +
|
||
0.25 * dataframe['gap'] +
|
||
0.15 * dataframe['trend']
|
||
).clip(-1, 1)
|
||
|
||
return dataframe
|
||
|
||
import pandas as pd
|
||
|
||
def to_utc_ts(self, x):
|
||
return pd.to_datetime(x, utc=True)
|
||
|
||
# suppose self.btc_ath_history exists (liste de dict)
|
||
def get_last_ath_before_candle(self, last_candle):
|
||
|
||
candle_date = self.to_utc_ts(last_candle['date']) # ou to_utc_ts(last_candle.name)
|
||
best = None
|
||
for a in self.btc_ath_history: #getattr(self, "btc_ath_history", []):
|
||
ath_date = self.to_utc_ts(a["date"])
|
||
if ath_date <= candle_date:
|
||
if best is None or ath_date > best[0]:
|
||
best = (ath_date, a["price_usd"])
|
||
return best[1] if best is not None else None
|
||
|
||
def trainModel(self, dataframe: DataFrame, metadata: dict):
|
||
pair = self.getShortName(metadata['pair'])
|
||
pd.set_option('display.max_rows', None)
|
||
pd.set_option('display.max_columns', None)
|
||
pd.set_option("display.width", 200)
|
||
path=self.path #f"user_data/plots/{pair}/"
|
||
os.makedirs(path, exist_ok=True)
|
||
|
||
# # Étape 1 : sélectionner numériques
|
||
# numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
|
||
#
|
||
# # Étape 2 : enlever constantes
|
||
# usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
|
||
# and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d")
|
||
# and not c.endswith("_class") and not c.endswith("_price")
|
||
# and not c.startswith('stop_buying'))]
|
||
#
|
||
# # Étape 3 : remplacer inf et NaN par 0
|
||
# dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
|
||
#
|
||
# print("Colonnes utilisables pour le modèle :")
|
||
# print(usable_cols)
|
||
#
|
||
# self.model_indicators = usable_cols
|
||
#
|
||
df = dataframe[self.model_indicators].copy()
|
||
|
||
# Corrélations des colonnes
|
||
corr = df.corr(numeric_only=True)
|
||
print("Corrélation des colonnes")
|
||
print(corr)
|
||
|
||
# 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
|
||
# df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int)
|
||
df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 100).astype(int)
|
||
df['target'] = df['target'].fillna(0).astype(int)
|
||
|
||
# Corrélations triées par importance avec une colonne cible
|
||
target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False)
|
||
print("Corrélations triées par importance avec une colonne cible")
|
||
print(target_corr)
|
||
|
||
# Corrélations triées par importance avec une colonne cible
|
||
corr = df.corr(numeric_only=True)
|
||
corr_unstacked = (
|
||
corr.unstack()
|
||
.reset_index()
|
||
.rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"})
|
||
)
|
||
# Supprimer les doublons col1/col2 inversés et soi-même
|
||
corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]]
|
||
|
||
# Trier par valeur absolue de corrélation
|
||
corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index)
|
||
print("Trier par valeur absolue de corrélation")
|
||
print(corr_sorted.head(20))
|
||
|
||
# --- Calcul de la corrélation ---
|
||
corr = df.corr(numeric_only=True) # évite les colonnes non numériques
|
||
corr = corr * 100 # passage en pourcentage
|
||
|
||
# --- Masque pour n’afficher que le triangle supérieur (optionnel) ---
|
||
mask = np.triu(np.ones_like(corr, dtype=bool))
|
||
|
||
# --- Création de la figure ---
|
||
fig, ax = plt.subplots(figsize=(96, 36))
|
||
|
||
# --- Heatmap avec un effet “température” ---
|
||
sns.heatmap(
|
||
corr,
|
||
mask=mask,
|
||
cmap="coolwarm", # palette bleu → rouge
|
||
center=0, # 0 au centre
|
||
annot=True, # affiche les valeurs dans chaque case
|
||
fmt=".0f", # format entier (pas de décimale)
|
||
cbar_kws={"label": "Corrélation (%)"}, # légende à droite
|
||
linewidths=0.5, # petites lignes entre les cases
|
||
ax=ax
|
||
)
|
||
|
||
# --- Personnalisation ---
|
||
ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20)
|
||
plt.xticks(rotation=45, ha="right")
|
||
plt.yticks(rotation=0)
|
||
|
||
# --- Sauvegarde ---
|
||
output_path = f"{self.path}/Matrice_de_correlation_temperature.png"
|
||
plt.savefig(output_path, bbox_inches="tight", dpi=150)
|
||
plt.close(fig)
|
||
|
||
print(f"✅ Matrice enregistrée : {output_path}")
|
||
|
||
# Exemple d'utilisation :
|
||
selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7)
|
||
print("===== 🎯 FEATURES SÉLECTIONNÉES =====")
|
||
print(selected_corr)
|
||
|
||
# Nettoyage
|
||
df = df.dropna()
|
||
|
||
X = df[self.model_indicators]
|
||
y = df['target'] # ta colonne cible binaire ou numérique
|
||
print("===== 🎯 FEATURES SCORES =====")
|
||
print(self.feature_auc_scores(X, y))
|
||
|
||
# 4️⃣ Split train/test
|
||
X = df[self.model_indicators]
|
||
y = df['target']
|
||
# Séparation temporelle (train = 80 %, valid = 20 %)
|
||
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False)
|
||
|
||
# Nettoyage des valeurs invalides
|
||
|
||
selector = VarianceThreshold(threshold=0.0001)
|
||
selector.fit(X_train)
|
||
selected = X_train.columns[selector.get_support()]
|
||
print("Colonnes conservées :", list(selected))
|
||
|
||
# 5️⃣ Entraînement du modèle
|
||
# self.train_model = RandomForestClassifier(n_estimators=200, random_state=42)
|
||
|
||
# def objective(trial):
|
||
# self.train_model = XGBClassifier(
|
||
# n_estimators=trial.suggest_int("n_estimators", 200, 300),
|
||
# max_depth=trial.suggest_int("max_depth", 3, 6),
|
||
# learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3),
|
||
# subsample=trial.suggest_float("subsample", 0.7, 1.0),
|
||
# colsample_bytree=trial.suggest_float("colsample_bytree", 0.7, 1.0),
|
||
# scale_pos_weight=1, # tu mettras balance_ratio ici si tu veux
|
||
# objective="binary:logistic",
|
||
# eval_metric="logloss",
|
||
# n_jobs=-1
|
||
# )
|
||
#
|
||
# self.train_model.fit(X_train, y_train)
|
||
#
|
||
# y_pred = self.train_model.predict(X_valid) # <-- validation = test split
|
||
# return f1_score(y_valid, y_pred)
|
||
#
|
||
# study = optuna.create_study(direction="maximize")
|
||
# study.optimize(objective, n_trials=50)
|
||
|
||
def objective(trial):
|
||
# local_model = XGBClassifier(
|
||
# n_estimators=300, # nombre d'arbres plus raisonnable
|
||
# learning_rate=0.01, # un peu plus rapide que 0.006, mais stable
|
||
# max_depth=4, # capture plus de patterns que 3, sans overfitting excessif
|
||
# subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting
|
||
# colsample_bytree=0.8, # 80% des features par arbre
|
||
# gamma=0.01, # gain minimal pour un split → régularisation
|
||
# reg_alpha=0.01, # L1 régularisation des feuilles
|
||
# reg_lambda=1, # L2 régularisation des feuilles
|
||
# n_jobs=-1, # utilise tous les cœurs CPU pour accélérer
|
||
# random_state=42, # reproductibilité
|
||
# missing=float('nan'), # valeur manquante reconnue
|
||
# eval_metric='logloss' # métrique pour classification binaire
|
||
# )
|
||
|
||
local_model = XGBClassifier(
|
||
n_estimators=trial.suggest_int("n_estimators", 300, 500),
|
||
max_depth=trial.suggest_int("max_depth", 1, 6),
|
||
learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True),
|
||
subsample=trial.suggest_float("subsample", 0.6, 1.0),
|
||
colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0),
|
||
scale_pos_weight=1,
|
||
objective="binary:logistic",
|
||
eval_metric="logloss",
|
||
n_jobs=-1
|
||
)
|
||
|
||
local_model.fit(
|
||
X_train,
|
||
y_train,
|
||
eval_set=[(X_valid, y_valid)],
|
||
# early_stopping_rounds=50,
|
||
verbose=False
|
||
)
|
||
|
||
proba = local_model.predict_proba(X_valid)[:, 1]
|
||
thresholds = np.linspace(0.1, 0.9, 50)
|
||
best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds)
|
||
|
||
return best_f1
|
||
study = optuna.create_study(direction="maximize")
|
||
study.optimize(objective, n_trials=20)
|
||
|
||
# SHAP
|
||
# Reconstruction du modèle final avec les meilleurs hyperparamètres
|
||
# Récupération des meilleurs paramètres trouvés
|
||
best_params = study.best_params
|
||
|
||
best_model = XGBClassifier(**best_params)
|
||
best_model.fit(X_train, y_train)
|
||
self.train_model = best_model
|
||
|
||
# === SHAP plots ===
|
||
# Calcul SHAP
|
||
explainer = shap.TreeExplainer(self.train_model)
|
||
shap_values = explainer(X_train)
|
||
|
||
# On choisit une observation pour le graphique waterfall
|
||
# Explication du modèle de prédiction pour la première ligne de X_valid.”
|
||
i = 0
|
||
|
||
# Extraction des valeurs
|
||
shap_val = shap_values[i].values
|
||
feature_names = X_train.columns
|
||
feature_values = X_train.iloc[i]
|
||
|
||
# Tri par importance absolue
|
||
# order = np.argsort(np.abs(shap_val))[::-1]
|
||
k = 10
|
||
order = np.argsort(np.abs(shap_val))[::-1][:k]
|
||
|
||
# ---- Création figure sans l'afficher ----
|
||
plt.ioff() # Désactive l'affichage interactif
|
||
|
||
shap.plots.waterfall(
|
||
shap.Explanation(
|
||
values=shap_val[order],
|
||
base_values=shap_values.base_values[i],
|
||
data=feature_values.values[order],
|
||
feature_names=feature_names[order]
|
||
),
|
||
show=False # IMPORTANT : n'affiche pas dans Jupyter / console
|
||
)
|
||
|
||
# Sauvegarde du graphique sur disque
|
||
output_path = f"{self.path}/shap_waterfall.png"
|
||
plt.savefig(output_path, dpi=200, bbox_inches='tight')
|
||
plt.close() # ferme la figure proprement
|
||
|
||
print(f"Graphique SHAP enregistré : {output_path}")
|
||
|
||
# FIN SHAP
|
||
# ---- après avoir exécuté la study ------
|
||
|
||
print("Best value (F1):", study.best_value)
|
||
print("Best params:", study.best_params)
|
||
|
||
best_trial = study.best_trial
|
||
print("\n=== BEST TRIAL ===")
|
||
print("Number:", best_trial.number)
|
||
print("Value:", best_trial.value)
|
||
print("Params:")
|
||
for k, v in best_trial.params.items():
|
||
print(f" - {k}: {v}")
|
||
|
||
# All trials summary
|
||
print("\n=== ALL TRIALS ===")
|
||
for t in study.trials:
|
||
print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}")
|
||
|
||
# DataFrame of trials
|
||
df = study.trials_dataframe()
|
||
print(df.head())
|
||
|
||
# Graphs
|
||
fig = plot_optimization_history(study)
|
||
fig.write_html(f"{self.path}/optimization_history.html")
|
||
fig = plot_param_importances(study)
|
||
fig.write_html(f"{self.path}/param_importances.html")
|
||
fig = plot_slice(study)
|
||
fig.write_html(f"{self.path}/slice.html")
|
||
fig = plot_parallel_coordinate(study)
|
||
fig.write_html(f"{self.path}/parallel_coordinates.html")
|
||
|
||
|
||
# 2️⃣ Sélection des features AVANT calibration
|
||
sfm = SelectFromModel(self.train_model, threshold="median", prefit=True)
|
||
selected_features = X_train.columns[sfm.get_support()]
|
||
print(selected_features)
|
||
|
||
# 3️⃣ Calibration ensuite (facultative)
|
||
calibrated = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5)
|
||
calibrated.fit(X_train[selected_features], y_train)
|
||
print(calibrated)
|
||
|
||
# # # calibration
|
||
# self.train_model = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5)
|
||
# # Sélection
|
||
# sfm = SelectFromModel(self.train_model, threshold="median")
|
||
# sfm.fit(X_train, y_train)
|
||
# selected_features = X_train.columns[sfm.get_support()]
|
||
# print(selected_features)
|
||
|
||
# self.train_model.fit(X_train, y_train)
|
||
|
||
y_pred = self.train_model.predict(X_valid)
|
||
y_proba = self.train_model.predict_proba(X_valid)[:, 1]
|
||
# print(classification_report(y_valid, y_pred))
|
||
# print(confusion_matrix(y_valid, y_pred))
|
||
print("\nRapport de classification :\n", classification_report(y_valid, y_pred))
|
||
print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred))
|
||
|
||
# # Importances
|
||
# importances = pd.DataFrame({
|
||
# "feature": self.train_model.feature_name_,
|
||
# "importance": self.train_model.feature_importances_
|
||
# }).sort_values("importance", ascending=False)
|
||
# print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
|
||
# print(importances)
|
||
|
||
# Feature importance
|
||
importances = self.train_model.feature_importances_
|
||
feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False)
|
||
|
||
# Affichage
|
||
feat_imp.plot(kind='bar', figsize=(12, 6))
|
||
plt.title("Feature importances")
|
||
# plt.show()
|
||
plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight')
|
||
|
||
result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10, random_state=42)
|
||
perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False)
|
||
perm_imp.plot(kind='bar', figsize=(12, 6))
|
||
plt.title("Permutation feature importance")
|
||
# plt.show()
|
||
plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight')
|
||
|
||
# Shap
|
||
explainer = shap.TreeExplainer(self.train_model)
|
||
shap_values = explainer.shap_values(X_valid)
|
||
|
||
# Résumé global
|
||
shap.summary_plot(shap_values, X_valid)
|
||
|
||
# Force plot pour une observation
|
||
force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :])
|
||
shap.save_html(f"{self.path}/shap_force_plot.html", force_plot)
|
||
|
||
fig, ax = plt.subplots(figsize=(24, 48))
|
||
PartialDependenceDisplay.from_estimator(
|
||
self.train_model,
|
||
X_valid,
|
||
selected_features,
|
||
kind="average",
|
||
ax=ax
|
||
)
|
||
fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight")
|
||
plt.close(fig)
|
||
|
||
best_f1 = 0
|
||
best_t = 0.5
|
||
for t in [0.3, 0.4, 0.5, 0.6, 0.7]:
|
||
y_pred_thresh = (y_proba > t).astype(int)
|
||
score = f1_score(y_valid, y_pred_thresh)
|
||
print(f"Seuil {t:.1f} → F1: {score:.3f}")
|
||
if score > best_f1:
|
||
best_f1 = score
|
||
best_t = t
|
||
|
||
print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}")
|
||
|
||
# 6️⃣ Évaluer la précision (facultatif)
|
||
preds = self.train_model.predict(X_valid)
|
||
acc = accuracy_score(y_valid, preds)
|
||
print(f"Accuracy: {acc:.3f}")
|
||
|
||
# 7️⃣ Sauvegarde du modèle
|
||
joblib.dump(self.train_model, f"{self.path}/{pair}_rf_model.pkl")
|
||
print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl")
|
||
|
||
|
||
# X = dataframe des features (après shift/rolling/indicators)
|
||
# y = target binaire ou décimale
|
||
# model = ton modèle entraîné (RandomForestClassifier ou Regressor)
|
||
|
||
# # --- 1️⃣ Mutual Information (MI) ---
|
||
# mi_scores = mutual_info_classif(X.fillna(0), y)
|
||
# mi_series = pd.Series(mi_scores, index=X.columns, name='MI')
|
||
#
|
||
# # --- 2️⃣ Permutation Importance (PI) ---
|
||
# pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
|
||
# pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI')
|
||
#
|
||
# # --- 3️⃣ Combinaison dans un seul dataframe ---
|
||
# importance_df = pd.concat([mi_series, pi_series], axis=1)
|
||
# importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle
|
||
# print(importance_df)
|
||
#
|
||
# importance_df.plot(kind='bar', figsize=(10, 5))
|
||
# plt.title("Mutual Info vs Permutation Importance")
|
||
# plt.ylabel("Score")
|
||
# plt.show()
|
||
|
||
self.analyze_model(pair, self.train_model, X_train, X_valid, y_train, y_valid)
|
||
|
||
def inspect_model(self, model):
|
||
"""
|
||
Affiche les informations d'un modèle ML déjà entraîné.
|
||
Compatible avec scikit-learn, xgboost, lightgbm, catboost...
|
||
"""
|
||
|
||
print("===== 🔍 INFORMATIONS DU MODÈLE =====")
|
||
|
||
# Type de modèle
|
||
print(f"Type : {type(model).__name__}")
|
||
print(f"Module : {model.__class__.__module__}")
|
||
|
||
# Hyperparamètres
|
||
if hasattr(model, "get_params"):
|
||
params = model.get_params()
|
||
print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====")
|
||
for k, v in params.items():
|
||
print(f"{k}: {v}")
|
||
|
||
# Nombre d’estimateurs
|
||
if hasattr(model, "n_estimators"):
|
||
print(f"\nNombre d’estimateurs : {model.n_estimators}")
|
||
|
||
# Importance des features
|
||
if hasattr(model, "feature_importances_"):
|
||
print("\n===== 📊 IMPORTANCE DES FEATURES =====")
|
||
|
||
# Correction ici :
|
||
feature_names = getattr(model, "feature_names_in_", None)
|
||
if isinstance(feature_names, np.ndarray):
|
||
feature_names = feature_names.tolist()
|
||
elif feature_names is None:
|
||
feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))]
|
||
|
||
fi = pd.DataFrame({
|
||
"feature": feature_names,
|
||
"importance": model.feature_importances_
|
||
}).sort_values(by="importance", ascending=False)
|
||
|
||
print(fi)
|
||
|
||
# Coefficients (modèles linéaires)
|
||
if hasattr(model, "coef_"):
|
||
print("\n===== ➗ COEFFICIENTS =====")
|
||
coef = np.array(model.coef_)
|
||
if coef.ndim == 1:
|
||
for i, c in enumerate(coef):
|
||
print(f"Feature {i}: {c:.6f}")
|
||
else:
|
||
print(coef)
|
||
|
||
# Intercept
|
||
if hasattr(model, "intercept_"):
|
||
print("\nIntercept :", model.intercept_)
|
||
|
||
# Classes connues
|
||
if hasattr(model, "classes_"):
|
||
print("\n===== 🎯 CLASSES =====")
|
||
print(model.classes_)
|
||
|
||
# Scores internes
|
||
for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]:
|
||
if hasattr(model, attr):
|
||
print(f"\n{attr} = {getattr(model, attr)}")
|
||
|
||
# Méthodes disponibles
|
||
print("\n===== 🧩 MÉTHODES DISPONIBLES =====")
|
||
methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)]
|
||
print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else ""))
|
||
|
||
print("\n===== ✅ FIN DE L’INSPECTION =====")
|
||
|
||
def analyze_model(self, pair, model, X_train, X_valid, y_train, y_valid):
|
||
"""
|
||
Analyse complète d'un modèle ML supervisé (classification binaire).
|
||
Affiche performances, importance des features, matrices, seuils, etc.
|
||
"""
|
||
os.makedirs(self.path, exist_ok=True)
|
||
|
||
# ---- Prédictions ----
|
||
preds = model.predict(X_valid)
|
||
probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds
|
||
|
||
# ---- Performances globales ----
|
||
print("===== 📊 ÉVALUATION DU MODÈLE =====")
|
||
print("Colonnes du modèle :", model.feature_names_in_)
|
||
print("Colonnes X_valid :", list(X_valid.columns))
|
||
print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}")
|
||
print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}")
|
||
|
||
print("TN (True Negative) / FP (False Positive)")
|
||
print("FN (False Negative) / TP (True Positive)")
|
||
print("\nRapport de classification :\n", classification_report(y_valid, preds))
|
||
|
||
# | Élément | Valeur | Signification |
|
||
# | ------------------- | ------ | ----------------------------------------------------------- |
|
||
# | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas d’achat) |
|
||
# | FP (False Positive) | 43 | Modèle a prédit 1 alors que c’était 0 (faux signal d’achat) |
|
||
# | FN (False Negative) | 108 | Modèle a prédit 0 alors que c’était 1 (manqué un achat) |
|
||
# | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal d’achat) |
|
||
|
||
# ---- Matrice de confusion ----
|
||
cm = confusion_matrix(y_valid, preds)
|
||
print("Matrice de confusion :\n", cm)
|
||
|
||
plt.figure(figsize=(4, 4))
|
||
plt.imshow(cm, cmap="Blues")
|
||
plt.title("Matrice de confusion")
|
||
plt.xlabel("Prédit")
|
||
plt.ylabel("Réel")
|
||
for i in range(2):
|
||
for j in range(2):
|
||
plt.text(j, i, cm[i, j], ha="center", va="center", color="black")
|
||
# plt.show()
|
||
plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight")
|
||
plt.close()
|
||
|
||
# ---- Importance des features ----
|
||
if hasattr(model, "feature_importances_"):
|
||
print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
|
||
importance = pd.DataFrame({
|
||
"feature": X_train.columns,
|
||
"importance": model.feature_importances_
|
||
}).sort_values(by="importance", ascending=False)
|
||
print(importance)
|
||
|
||
# Crée une figure plus grande
|
||
fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces
|
||
|
||
# Trace le bar plot sur cet axe
|
||
importance.plot.bar(x="feature", y="importance", legend=False, ax=ax)
|
||
|
||
# Tourner les labels pour plus de lisibilité
|
||
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right')
|
||
|
||
plt.title("Importance des features")
|
||
# plt.show()
|
||
plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight")
|
||
plt.close()
|
||
|
||
# ---- Arbre de décision (extrait) ----
|
||
if hasattr(model, "estimators_"):
|
||
print("\n===== 🌳 EXTRAIT D’UN ARBRE =====")
|
||
print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800])
|
||
|
||
# ---- Précision selon le seuil ----
|
||
thresholds = np.linspace(0.1, 0.9, 9)
|
||
print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====")
|
||
for t in thresholds:
|
||
preds_t = (probs > t).astype(int)
|
||
acc = accuracy_score(y_valid, preds_t)
|
||
print(f"Seuil {t:.1f} → précision {acc:.3f}")
|
||
|
||
# ---- ROC Curve ----
|
||
fpr, tpr, _ = roc_curve(y_valid, probs)
|
||
plt.figure(figsize=(5, 4))
|
||
plt.plot(fpr, tpr, label="ROC curve")
|
||
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
|
||
plt.xlabel("Taux de faux positifs")
|
||
plt.ylabel("Taux de vrais positifs")
|
||
plt.title("Courbe ROC")
|
||
plt.legend()
|
||
# plt.show()
|
||
plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight")
|
||
plt.close()
|
||
|
||
# # ---- Interprétation SHAP (optionnelle) ----
|
||
# try:
|
||
# import shap
|
||
#
|
||
# print("\n===== 💡 ANALYSE SHAP =====")
|
||
# explainer = shap.TreeExplainer(model)
|
||
# shap_values = explainer.shap_values(X_valid)
|
||
# # shap.summary_plot(shap_values[1], X_valid)
|
||
# # Vérifie le type de sortie de shap_values
|
||
# if isinstance(shap_values, list):
|
||
# # Cas des modèles de classification (plusieurs classes)
|
||
# shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
|
||
# else:
|
||
# shap_values_to_plot = shap_values
|
||
#
|
||
# # Ajustement des dimensions au besoin
|
||
# if shap_values_to_plot.shape[1] != X_valid.shape[1]:
|
||
# print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})")
|
||
# min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1])
|
||
# shap_values_to_plot = shap_values_to_plot[:, :min_dim]
|
||
# X_to_plot = X_valid.iloc[:, :min_dim]
|
||
# else:
|
||
# X_to_plot = X_valid
|
||
#
|
||
# plt.figure(figsize=(12, 4))
|
||
# shap.summary_plot(shap_values_to_plot, X_to_plot, show=False)
|
||
# plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight")
|
||
# plt.close()
|
||
# except ImportError:
|
||
# print("\n(SHAP non installé — `pip install shap` pour activer l’analyse SHAP.)")
|
||
|
||
y_proba = model.predict_proba(X_valid)[:, 1]
|
||
|
||
# Trace ou enregistre le graphique
|
||
self.plot_threshold_analysis(y_valid, y_proba, step=0.05,
|
||
save_path=f"{self.path}/threshold_analysis.png")
|
||
|
||
# y_valid : vraies classes (0 / 1)
|
||
# y_proba : probabilités de la classe 1 prédites par ton modèle
|
||
# Exemple : y_proba = model.predict_proba(X_valid)[:, 1]
|
||
|
||
seuils = np.arange(0.0, 1.01, 0.05)
|
||
precisions, recalls, f1s = [], [], []
|
||
|
||
for seuil in seuils:
|
||
y_pred = (y_proba >= seuil).astype(int)
|
||
precisions.append(precision_score(y_valid, y_pred))
|
||
recalls.append(recall_score(y_valid, y_pred))
|
||
f1s.append(f1_score(y_valid, y_pred))
|
||
|
||
plt.figure(figsize=(10, 6))
|
||
plt.plot(seuils, precisions, label='Précision', marker='o')
|
||
plt.plot(seuils, recalls, label='Rappel', marker='o')
|
||
plt.plot(seuils, f1s, label='F1-score', marker='o')
|
||
|
||
# Ajoute un point pour le meilleur F1
|
||
best_idx = np.argmax(f1s)
|
||
plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f'Max F1 ({seuils[best_idx]:.2f})')
|
||
|
||
plt.title("Performance du modèle selon le seuil de probabilité")
|
||
plt.xlabel("Seuil de probabilité (classe 1)")
|
||
plt.ylabel("Score")
|
||
plt.grid(True, alpha=0.3)
|
||
plt.legend()
|
||
plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight')
|
||
# plt.show()
|
||
|
||
print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}")
|
||
|
||
print("\n===== ✅ FIN DE L’ANALYSE =====")
|
||
|
||
def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None):
|
||
"""
|
||
Affiche la précision, le rappel et le F1-score selon le seuil de décision.
|
||
y_true : labels réels (0 ou 1)
|
||
y_proba : probabilités prédites (P(hausse))
|
||
step : pas entre les seuils testés
|
||
save_path : si renseigné, enregistre l'image au lieu d'afficher
|
||
"""
|
||
|
||
# Le graphique généré affichera trois courbes :
|
||
# 🔵 Precision — la fiabilité de tes signaux haussiers.
|
||
# 🟢 Recall — la proportion de hausses que ton modèle détecte.
|
||
# 🟣 F1-score — le compromis optimal entre les deux.
|
||
|
||
thresholds = np.arange(0, 1.01, step)
|
||
precisions, recalls, f1s = [], [], []
|
||
|
||
for thr in thresholds:
|
||
preds = (y_proba >= thr).astype(int)
|
||
precisions.append(precision_score(y_true, preds))
|
||
recalls.append(recall_score(y_true, preds))
|
||
f1s.append(f1_score(y_true, preds))
|
||
|
||
plt.figure(figsize=(10, 6))
|
||
plt.plot(thresholds, precisions, label="Precision", linewidth=2)
|
||
plt.plot(thresholds, recalls, label="Recall", linewidth=2)
|
||
plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--")
|
||
plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5")
|
||
plt.title("📊 Performance selon le seuil de probabilité", fontsize=14)
|
||
plt.xlabel("Seuil de décision (threshold)")
|
||
plt.ylabel("Score")
|
||
plt.legend()
|
||
plt.grid(True, alpha=0.3)
|
||
|
||
if save_path:
|
||
plt.savefig(save_path, bbox_inches='tight')
|
||
print(f"✅ Graphique enregistré : {save_path}")
|
||
else:
|
||
plt.show()
|
||
|
||
def feature_auc_scores(self, X, y):
|
||
aucs = {}
|
||
for col in X.columns:
|
||
try:
|
||
aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0))
|
||
except Exception:
|
||
aucs[col] = np.nan
|
||
return pd.Series(aucs).sort_values(ascending=False)
|
||
|
||
def listUsableColumns(self, dataframe):
|
||
# Étape 1 : sélectionner numériques
|
||
numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
|
||
# Étape 2 : enlever constantes
|
||
usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
|
||
and not c.endswith("_state")
|
||
and not c.endswith("_1d")
|
||
# and not c.endswith("_1h")
|
||
and not c.startswith("open") and not c.startswith("close")
|
||
and not c.startswith("low") and not c.startswith("high")
|
||
and not c.startswith("haopen") and not c.startswith("haclose")
|
||
# and not c.startswith("bb_lower") and not c.startswith("bb_upper")
|
||
# and not c.startswith("bb_middle")
|
||
and not c.endswith("_count")
|
||
and not c.endswith("_class") and not c.endswith("_price")
|
||
and not c.startswith('stop_buying')
|
||
and not c.startswith('target')
|
||
and not c.startswith('lvl')
|
||
]
|
||
# Étape 3 : remplacer inf et NaN par 0
|
||
dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
|
||
print("Colonnes utilisables pour le modèle :")
|
||
print(usable_cols)
|
||
# self.model_indicators = usable_cols
|
||
return usable_cols
|
||
|
||
|
||
def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7):
|
||
"""
|
||
Sélectionne les features les plus corrélées avec target,
|
||
tout en supprimant celles trop corrélées entre elles.
|
||
"""
|
||
# 1️⃣ Calcul des corrélations absolues avec la cible
|
||
corr = df.corr(numeric_only=True)
|
||
corr_target = corr[target].abs().sort_values(ascending=False)
|
||
|
||
# 2️⃣ Prend les N features les plus corrélées avec la cible (hors target)
|
||
features = corr_target.drop(target).head(top_n).index.tolist()
|
||
|
||
# 3️⃣ Évite les features trop corrélées entre elles
|
||
selected = []
|
||
for feat in features:
|
||
too_correlated = False
|
||
for sel in selected:
|
||
if abs(corr.loc[feat, sel]) > corr_threshold:
|
||
too_correlated = True
|
||
break
|
||
if not too_correlated:
|
||
selected.append(feat)
|
||
|
||
# 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation
|
||
selected_corr = pd.DataFrame({
|
||
"feature": selected,
|
||
"corr_with_target": [corr.loc[f, target] for f in selected]
|
||
}).sort_values(by="corr_with_target", key=np.abs, ascending=False)
|
||
|
||
return selected_corr
|
||
|
||
def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'):
|
||
dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean()
|
||
dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", ema_period=window)
|
||
return dataframe
|
||
|
||
def calculeDerivees(
|
||
self,
|
||
dataframe: pd.DataFrame,
|
||
name: str,
|
||
suffixe: str = '',
|
||
window: int = 100,
|
||
coef: float = 0.15,
|
||
ema_period: int = 10,
|
||
verbose: bool = True,
|
||
) -> pd.DataFrame:
|
||
"""
|
||
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
|
||
avec epsilon adaptatif basé sur rolling percentiles.
|
||
"""
|
||
|
||
d1_col = f"{name}{suffixe}_deriv1"
|
||
d2_col = f"{name}{suffixe}_deriv2"
|
||
# d1s_col = f"{name}{suffixe}_deriv1_smooth"
|
||
# d2s_col = f"{name}{suffixe}_deriv2_smooth"
|
||
tendency_col = f"{name}{suffixe}_state"
|
||
|
||
factor1 = 100 * (ema_period / 5)
|
||
factor2 = 10 * (ema_period / 5)
|
||
|
||
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \
|
||
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
|
||
# --- Distance à la moyenne mobile ---
|
||
dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"]
|
||
|
||
|
||
# dérivée relative simple
|
||
dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
|
||
# lissage EMA
|
||
dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean()
|
||
|
||
# dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median()
|
||
|
||
dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1)
|
||
dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean()
|
||
|
||
# epsilon adaptatif via rolling percentile
|
||
p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05)
|
||
p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95)
|
||
p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05)
|
||
p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95)
|
||
|
||
eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef
|
||
eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef
|
||
|
||
# fallback global eps
|
||
global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef
|
||
global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef
|
||
|
||
eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1)
|
||
eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2)
|
||
|
||
# if verbose and self.dp.runmode.value in ('backtest'):
|
||
# stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T
|
||
# stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0)
|
||
# print(f"---- Derivatives stats {timeframe}----")
|
||
# print(stats)
|
||
# print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}")
|
||
# print("---------------------------")
|
||
|
||
# mapping tendency
|
||
def tag_by_derivatives(row):
|
||
idx = int(row.name)
|
||
d1v = float(row[d1_col])
|
||
d2v = float(row[d2_col])
|
||
eps1 = float(eps_d1_series.iloc[idx])
|
||
eps2 = float(eps_d2_series.iloc[idx])
|
||
|
||
# # mapping état → codes 3 lettres explicites
|
||
# # | Ancien état | Nouveau code 3 lettres | Interprétation |
|
||
# # | ----------- | ---------------------- | --------------------- |
|
||
# # | 4 | HAU | Hausse Accélérée |
|
||
# # | 3 | HSR | Hausse Ralentissement |
|
||
# # | 2 | HST | Hausse Stable |
|
||
# # | 1 | DHB | Départ Hausse |
|
||
# # | 0 | PAL | Palier / neutre |
|
||
# # | -1 | DBD | Départ Baisse |
|
||
# # | -2 | BSR | Baisse Ralentissement |
|
||
# # | -3 | BST | Baisse Stable |
|
||
# # | -4 | BAS | Baisse Accélérée |
|
||
|
||
# Palier strict
|
||
if abs(d1v) <= eps1 and abs(d2v) <= eps2:
|
||
return 0
|
||
# Départ si d1 ~ 0 mais d2 signale direction
|
||
if abs(d1v) <= eps1:
|
||
return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0
|
||
# Hausse
|
||
if d1v > eps1:
|
||
return 4 if d2v > eps2 else 3
|
||
# Baisse
|
||
if d1v < -eps1:
|
||
return -4 if d2v < -eps2 else -2
|
||
return 0
|
||
|
||
dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1)
|
||
|
||
# if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'):
|
||
# print("##################")
|
||
# print(f"# STAT {timeframe} {name}{suffixe}")
|
||
# print("##################")
|
||
# self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2")
|
||
|
||
return dataframe
|
||
|
||
def calculModelInformative(self, informative):
|
||
# préparation
|
||
# print(df)
|
||
df = informative.copy()
|
||
X = df[self.listUsableColumns(df)]
|
||
df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 0).astype(int)
|
||
df['target'] = df['target'].fillna(0).astype(int)
|
||
y = df['target']
|
||
|
||
# train/test
|
||
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=False, test_size=0.2)
|
||
|
||
# Pipeline normalisé + Logistic Regresson
|
||
clf = Pipeline([
|
||
("scaler", StandardScaler()),
|
||
("logreg", LogisticRegression(max_iter=5000))
|
||
])
|
||
|
||
# Calibration CV automatique
|
||
cal = CalibratedClassifierCV(clf, cv=3, method="isotonic")
|
||
|
||
# Entraînement
|
||
cal.fit(X_train, y_train)
|
||
|
||
# Probabilités calibrées
|
||
probas = cal.predict_proba(X_test)[:, 1]
|
||
# Injection propre des probabilités dans le dataframe original (aux bons index)
|
||
df.loc[X_test.index, 'ml_prob'] = probas
|
||
|
||
print("Brier score:", brier_score_loss(y_test, probas))
|
||
print("ROC AUC:", roc_auc_score(y_test, probas))
|
||
|
||
# joindre probabilités au df (dernières lignes correspondantes)
|
||
return probas
|
||
|
||
|
||
def prune_features(self, model, dataframe, feature_columns, importance_threshold=0.01):
|
||
"""
|
||
Supprime les features dont l'importance est inférieure au seuil.
|
||
|
||
Args:
|
||
model: XGBClassifier déjà entraîné
|
||
dataframe: DataFrame contenant toutes les features
|
||
feature_columns: liste des colonnes/features utilisées pour la prédiction
|
||
importance_threshold: seuil minimal pour conserver une feature (en proportion de l'importance totale)
|
||
|
||
Returns:
|
||
dataframe_pruned: dataframe avec uniquement les features conservées
|
||
kept_features: liste des features conservées
|
||
"""
|
||
booster = model.get_booster()
|
||
|
||
# Récupérer importance des features selon 'gain'
|
||
importance = booster.get_score(importance_type='gain')
|
||
|
||
# Normaliser pour que la somme soit 1
|
||
total_gain = sum(importance.values())
|
||
normalized_importance = {k: v / total_gain for k, v in importance.items()}
|
||
|
||
# Features à garder
|
||
kept_features = [f for f in feature_columns if normalized_importance.get(f, 0) >= importance_threshold]
|
||
|
||
dataframe_pruned = dataframe[kept_features].fillna(0)
|
||
|
||
print(f"⚡ Features conservées ({len(kept_features)} / {len(feature_columns)}): {kept_features}")
|
||
|
||
return dataframe_pruned, kept_features
|