# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np import os import json import csv from pandas import DataFrame from typing import Optional, Union, Tuple import math import logging from pathlib import Path # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from datetime import timezone, timedelta # Machine Learning from sklearn.ensemble import RandomForestClassifier,RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error from sklearn.metrics import accuracy_score import joblib import matplotlib.pyplot as plt from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve, precision_score, recall_score, precision_recall_curve, f1_score ) from sklearn.tree import export_text import inspect from sklearn.feature_selection import mutual_info_classif from sklearn.inspection import permutation_importance from lightgbm import LGBMClassifier from sklearn.calibration import CalibratedClassifierCV from sklearn.feature_selection import SelectFromModel from tabulate import tabulate from sklearn.model_selection import GridSearchCV from sklearn.feature_selection import VarianceThreshold import seaborn as sns from xgboost import XGBClassifier import optuna from optuna.visualization import plot_optimization_history from optuna.visualization import plot_slice from optuna.visualization import plot_param_importances from optuna.visualization import plot_parallel_coordinate import shap from sklearn.inspection import PartialDependenceDisplay from sklearn.model_selection import train_test_split from sklearn.metrics import f1_score from xgboost import XGBClassifier from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.calibration import CalibratedClassifierCV from sklearn.metrics import brier_score_loss, roc_auc_score from sklearn.preprocessing import StandardScaler from sklearn.pipeline import Pipeline logger = logging.getLogger(__name__) # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" class FrictradeLearning(IStrategy): startup_candle_count = 180 train_model = None model_indicators = [] DEFAULT_PARAMS = { "rsi_buy": 30, "rsi_sell": 70, "ema_period": 21, "sma_short": 20, "sma_long": 100, "atr_period": 14, "atr_multiplier": 1.5, "stake_amount": None, # use exchange default "stoploss": -0.10, "minimal_roi": {"0": 0.10} } dca_levels = { 0: 0.00, -2: 0.05, -4: 0.07, -6: 0.10, -8: 0.12, -10: 0.15, -12: 0.18, -14: 0.22, -16: 0.26, -18: 0.30, } allow_decrease_rate = 0.4 # ROI table: minimal_roi = { "0": 10 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = False trailing_stop = False trailing_stop_positive = 0.15 trailing_stop_positive_offset = 0.5 trailing_only_offset_is_reached = True # Buy hypers timeframe = '1m' max_open_trades = 5 max_amount = 40 parameters = {} # DCA config position_adjustment_enable = True columns_logged = False pairs = { pair: { "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'first_amount': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False, 'last_ath': 0, 'dca_thresholds': {} } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } trades = list() max_profit_pairs = {} btc_ath_history = [ {"date": "2011-06-09", "price_usd": 26.15, "note": "pic 2011 (early breakout)"}, {"date": "2013-11-29", "price_usd": 1132.00, "note": "bull run fin 2013"}, {"date": "2017-12-17", "price_usd": 19783.00, "note": "ATH décembre 2017 (crypto bubble)"}, {"date": "2020-12-31", "price_usd": 29001.72, "note": "fin 2020, nouveau record après accumulation)"}, {"date": "2021-11-10", "price_usd": 68742.00, "note": "record novembre 2021 (institutional demand)"}, {"date": "2024-03-05", "price_usd": 69000.00, "note": "nouveau pic début 2024 (source presse, valeur indicative)"}, {"date": "2025-07-11", "price_usd": 118755.00, "note": "pic juillet 2025 (valeur rapportée par la presse)"}, {"date": "2025-10-06", "price_usd": 126198.07, "note": "pic oct. 2025 (source agrégée, à vérifier selon l'exchange)"} ] def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.pairs[pair]['first_amount'] = stake_amount self.calculateStepsDcaThresholds(last_candle, pair) self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def calculateStepsDcaThresholds(self, last_candle, pair): def split_ratio_one_third(n, p): a = n / (2 * p) # première valeur d = n / (p * (p - 1)) # incrément return [round(a + i * d, 3) for i in range(p)] if self.pairs[pair]['last_ath'] == 0 : ath = max(last_candle['mid'], self.get_last_ath_before_candle(last_candle)) self.pairs[pair]['last_ath'] = ath steps = self.approx_value(last_candle['mid'], self.pairs[pair]['last_ath']) self.pairs[pair]['dca_thresholds'] = split_ratio_one_third( (last_candle['mid'] - (self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate))) / last_candle['mid'], steps) print(f"val={last_candle['mid']} steps={steps} pct={(last_candle['mid'] - (self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate))) / last_candle['mid']}") print(self.pairs[pair]['dca_thresholds']) def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0)) profit =trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['previous_profit'] = 0 self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None # else: # self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') # def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): # # dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) # last_candle = dataframe.iloc[-1].squeeze() # last_candle_1h = dataframe.iloc[-13].squeeze() # before_last_candle = dataframe.iloc[-2].squeeze() # before_last_candle_2 = dataframe.iloc[-3].squeeze() # before_last_candle_12 = dataframe.iloc[-13].squeeze() # # expected_profit = self.expectedProfit(pair, last_candle) # # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") # # max_touch_before = self.pairs[pair]['max_touch'] # self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) # self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) # self.pairs[pair]['current_trade'] = trade # # count_of_buys = trade.nr_of_successful_entries # # profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) # self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) # max_profit = last_candle['max5'] #self.pairs[pair]['max_profit'] # baisse = 0 # if profit > 0: # baisse = 1 - (profit / max_profit) # mx = max_profit / 5 # self.pairs[pair]['count_of_buys'] = count_of_buys # self.pairs[pair]['current_profit'] = profit # # dispo = round(self.wallets.get_available_stake_amount()) # hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 # days_since_first_buy = (current_time - trade.open_date_utc).days # hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 # minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 # # if minutes % 4 == 0: # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else " # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=round(profit, 2), # buys=count_of_buys, # stake=0 # ) # # if (last_candle['close'] > last_candle['mid']) or (last_candle['sma5_deriv1'] > 0): # return None # # pair_name = self.getShortName(pair) # # if profit > 0.003 * count_of_buys and baisse > 0.30: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # # self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '') def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if (self.getShortName(pair) == 'BTC'): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit return expected_profit def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_min':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|{'rsi_1h':>6}|{'rsi_1d':>6}|{'mlprob':>6}" #|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] self.printLog(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None rsi = '' rsi_pct = '' sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = '' last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) color = GREEN if profit > 0 else RED profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|{round(last_candle['rsi_1h'], 1) or '-' :>6}|{round(last_candle['rsi_1d'], 1) or '-' :>6}|" # f"{round(last_candle['rtp_1h'] * 100, 0) or '-' :>6}|{round(last_candle['rtp_1d'] * 100, 0) or '-' :>6}|" f"{round(last_candle['ml_prob'], 1) or '-' :>6}|" ) def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] short_pair = self.getShortName(pair) self.path = f"user_data/strategies/plots/{short_pair}/" # + ("valide/" if not self.dp.runmode.value in ('backtest') else '') heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2 dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean() dataframe['sma5_deriv1'] = 1000 * (dataframe['sma5'] - dataframe['sma5'].shift(1)) / dataframe['sma5'].shift(1) dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() dataframe['sma12_deriv1'] = 1000 * (dataframe['sma12'] - dataframe['sma12'].shift(1)) / dataframe[ 'sma12'].shift(1) dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() dataframe['sma24_deriv1'] = 1000 * (dataframe['sma24'] - dataframe['sma24'].shift(1)) / dataframe['sma24'].shift(1) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() dataframe['sma60_deriv1'] = 1000 * (dataframe['sma60'] - dataframe['sma60'].shift(1)) / dataframe['sma60'].shift(1) # dataframe[f"sma5_inv"] = (dataframe[f"sma5"].shift(2) >= dataframe[f"sma5"].shift(1)) \ # & (dataframe[f"sma5"].shift(1) <= dataframe[f"sma5"]) dataframe["sma5_sqrt"] = ( np.sqrt(np.abs(dataframe["sma5"] - dataframe["sma5"].shift(1))) + np.sqrt(np.abs(dataframe["sma5"].shift(3) - dataframe["sma5"].shift(1))) ) dataframe["sma5_inv"] = ( (dataframe["sma5"].shift(2) >= dataframe["sma5"].shift(1)) & (dataframe["sma5"].shift(1) <= dataframe["sma5"]) & (dataframe["sma5_sqrt"] > 5) ) dataframe["sma12_sqrt"] = ( np.sqrt(np.abs(dataframe["sma12"] - dataframe["sma12"].shift(1))) + np.sqrt(np.abs(dataframe["sma12"].shift(3) - dataframe["sma12"].shift(1))) ) dataframe["sma12_inv"] = ( (dataframe["sma12"].shift(2) >= dataframe["sma12"].shift(1)) & (dataframe["sma12"].shift(1) <= dataframe["sma12"]) & (dataframe["sma12_sqrt"] > 5) ) dataframe["percent"] = dataframe['mid'].pct_change() dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean() dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean() dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean() dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14) self.calculeDerivees(dataframe, 'rsi', ema_period=12) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5) dataframe['min180'] = talib.MIN(dataframe['mid'], timeperiod=180) dataframe['max180'] = talib.MAX(dataframe['mid'], timeperiod=180) dataframe['pct180'] = ((dataframe["mid"] - dataframe['min180'] ) / (dataframe['max180'] - dataframe['min180'] )) dataframe = self.rsi_trend_probability(dataframe, short=60, long=360) # ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h') informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 # Calcul MACD macd, macdsignal, macdhist = talib.MACD( informative['close'], fastperiod=12, slowperiod=26, signalperiod=9 ) informative['macd'] = macd informative['macdsignal'] = macdsignal informative['macdhist'] = macdhist informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14) informative['sma24'] = informative['mid'].ewm(span=24, adjust=False).mean() informative['sma24_deriv1'] = 1000 * (informative['sma24'] - informative['sma24'].shift(1)) / informative['sma24'].shift(1) informative['sma60'] = informative['mid'].ewm(span=60, adjust=False).mean() informative['sma60_deriv1'] = 1000 * (informative['sma60'] - informative['sma60'].shift(1)) / informative['sma60'].shift(1) informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14) self.calculeDerivees(informative, 'rsi', ema_period=12) # informative = self.rsi_trend_probability(informative) probas = self.calculModelInformative(informative) # informative = self.populate1hIndicators(df=informative, metadata=metadata) # informative = self.calculateRegression(informative, 'mid', lookback=15) dataframe = merge_informative_pair(dataframe, informative, '1m', '1h', ffill=True) # ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1d') informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 informative['rsi'] = talib.RSI(informative['mid'], timeperiod=5) # informative = self.rsi_trend_probability(informative) # informative = self.calculateRegression(informative, 'mid', lookback=15) dataframe = merge_informative_pair(dataframe, informative, '1m', '1d', ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 min_price = 111111111111110; max_price = 0; for buy in filled_buys: if count == 0: min_price = min(min_price, buy.price) max_price = max(max_price, buy.price) dataframe['first_price'] = buy.price self.pairs[pair]['first_buy'] = buy.price self.pairs[pair]['first_amount'] = buy.price * buy.filled # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_buy'] = buy.price count = count + 1 amount += buy.price * buy.filled count_buys = count self.pairs[pair]['total_amount'] = amount dataframe['absolute_min'] = dataframe['mid'].rolling(1440, min_periods=1).min() dataframe['absolute_max'] = dataframe['mid'].rolling(1440, min_periods=1).max() # steps = (dataframe['absolute_max'] - dataframe['absolute_min']) / (dataframe['absolute_min'] * 0.01) # levels = [dataframe['absolute_min'] * (1 + i / 100) for i in range(1, steps + 1)] # # print(levels) ########################################################### # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma24"] # Calcul MACD macd, macdsignal, macdhist = talib.MACD( dataframe['close'], fastperiod=12, slowperiod=26, signalperiod=9 ) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le dataframe dataframe['macd'] = macd dataframe['macdsignal'] = macdsignal dataframe['macdhist'] = macdhist # Regarde dans le futur # # --- Rendre relatif sur chaque série (-1 → 1) --- # for col in ['macd', 'macdsignal', 'macdhist']: # series = dataframe[col] # valid = series[~np.isnan(series)] # ignorer NaN # min_val = valid.min() # max_val = valid.max() # span = max_val - min_val if max_val != min_val else 1 # dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1 # # dataframe['tdc_macd'] = self.macd_tendance_int( # dataframe, # macd_col='macd_rel', # signal_col='macdsignal_rel', # hist_col='macdhist_rel' # ) # ------------------------------------------------------------------------------------ # rolling SMA indicators (used for trend detection too) s_short = self.DEFAULT_PARAMS['sma_short'] s_long = self.DEFAULT_PARAMS['sma_long'] dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean() dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean() # --- pente brute --- dataframe['slope'] = dataframe['sma24'].diff() # --- lissage EMA --- dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean() # # RSI # window = 14 # delta = dataframe['close'].diff() # up = delta.clip(lower=0) # down = -1 * delta.clip(upper=0) # ma_up = up.rolling(window=window).mean() # ma_down = down.rolling(window=window).mean() # rs = ma_up / ma_down.replace(0, 1e-9) # dataframe['rsi'] = 100 - (100 / (1 + rs)) # # # EMA example # dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean() # # # ATR (simple implementation) # high_low = dataframe['high'] - dataframe['low'] # high_close = (dataframe['high'] - dataframe['close'].shift()).abs() # low_close = (dataframe['low'] - dataframe['close'].shift()).abs() # tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1) # dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean() ########################### # df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume # Assure-toi qu'il est trié par date croissante timeframe = self.timeframe # --- Volatilité normalisée --- dataframe['atr'] = ta.volatility.AverageTrueRange( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).average_true_range() dataframe['atr_norm'] = dataframe['atr'] / dataframe['close'] # --- Force de tendance --- dataframe['adx'] = ta.trend.ADXIndicator( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).adx() # --- Volume directionnel (On Balance Volume) --- dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator( close=dataframe['close'], volume=dataframe['volume'] ).on_balance_volume() self.calculeDerivees(dataframe, 'obv', ema_period=1) dataframe['obv12'] = ta.volume.OnBalanceVolumeIndicator( close=dataframe['sma12'], volume=dataframe['volume'].rolling(12).sum() ).on_balance_volume() dataframe['obv24'] = ta.volume.OnBalanceVolumeIndicator( close=dataframe['sma24'], volume=dataframe['volume'].rolling(24).sum() ).on_balance_volume() # self.calculeDerivees(dataframe, 'obv5', ema_period=5) # --- Volatilité récente (écart-type des rendements) --- dataframe['vol_24'] = dataframe['percent'].rolling(24).std() # Compter les baisses / hausses consécutives # self.calculateDownAndUp(dataframe, limit=0.0001) # df : ton dataframe OHLCV + indicateurs existants # Assurez-vous que les colonnes suivantes existent : # 'max_rsi_12', 'roc_24', 'bb_percent_1h' # --- Filtrage des NaN initiaux --- # dataframe = dataframe.dropna() dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width'] dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3) dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9) dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0 ########################################################### # print(f"min={dataframe['absolute_min'].min()} max={dataframe['absolute_max'].max()}") for i in [0, 1, 2, 3]: dataframe[f"lvl_{i}_pct"] = dataframe['absolute_min'] * (1 + 0.01 * i) self.model_indicators = self.listUsableColumns(dataframe) if False and self.dp.runmode.value in ('backtest'): self.trainModel(dataframe, metadata) short_pair = self.getShortName(pair) # path=f"user_data/strategies/plots/{short_pair}/" self.model = joblib.load(f"{self.path}/{short_pair}_rf_model.pkl") # Préparer les features pour la prédiction features = dataframe[self.model_indicators].fillna(0) # Prédiction : probabilité que le prix monte # Affichage des colonnes intérressantes dans le model features_pruned, kept_features = self.prune_features( model=self.model, dataframe=dataframe, feature_columns=self.model_indicators, importance_threshold=0.005 # enlever features < % importance ) probs = self.model.predict_proba(features)[:, 1] # Sauvegarder la probabilité pour l’analyse dataframe['ml_prob'] = probs if False and self.dp.runmode.value in ('backtest'): self.inspect_model(self.model) # # absolute_min = dataframe['absolute_min'].min() # absolute_max = dataframe['absolute_max'].max() # # # Écart total # diff = absolute_max - absolute_min # # # Nombre de lignes intermédiaires (1% steps) # steps = int((absolute_max - absolute_min) / (absolute_min * 0.01)) # # # Niveaux de prix à 1%, 2%, ..., steps% # levels = [absolute_min * (1 + i / 100) for i in range(1, steps + 1)] # levels = [lvl for lvl in levels if lvl < absolute_max] # évite le dernier niveau exact # # # ajout dans le DataFrame # for i, lvl in enumerate(levels, start=1): # dataframe[f"lvl_{i}_pct"] = lvl # # Indices correspondants # indices = [(dataframe['mid'] - lvl).abs().idxmin() for lvl in levels] # Non utilisé dans le modèle dataframe['min60'] = talib.MIN(dataframe['mid'], timeperiod=60) # val = 90000 # steps = 12 # [0.018, 0.022, 0.025, 0.028, 0.032, 0.035, 0.038, 0.042, 0.045, 0.048, 0.052, 0.055] # val = 100000 # steps = 20 # [0.012, 0.014, 0.015, 0.016, 0.018, 0.019, 0.02, 0.022, 0.023, 0.024, 0.025, 0.027, 0.028, 0.029, 0.031, 0.032, # 0.033, 0.035, 0.036, 0.037] # val = 110000 # steps = 28 # [0.01, 0.01, 0.011, 0.012, 0.013, 0.013, 0.014, 0.015, 0.015, 0.016, 0.017, 0.018, 0.018, 0.019, 0.02, 0.02, # 0.021, 0.022, 0.023, 0.023, 0.024, 0.025, 0.025, 0.026, 0.027, 0.028, 0.028, 0.029] # val = 120000 # steps = 35 # [0.008, 0.009, 0.009, 0.01, 0.01, 0.011, 0.011, 0.012, 0.012, 0.013, 0.013, 0.014, 0.014, 0.015, 0.015, 0.016, # 0.016, 0.017, 0.017, 0.018, 0.018, 0.019, 0.019, 0.019, 0.02, 0.02, 0.021, 0.021, 0.022, 0.022, 0.023, 0.023, # 0.024, 0.024, 0.025] # def split_ratio_one_third(n, p): # a = n / (2 * p) # première valeur # d = n / (p * (p - 1)) # incrément # return [round(a + i * d, 3) for i in range(p)] # # for val in range(90000, 130000, 10000): # steps = self.approx_value(val, 126000) # print(f"val={val} steps={steps} pct={(val - (126000 * (1 - self.allow_decrease_rate))) / val}") # dca = split_ratio_one_third((val - (126000 * (1 - self.allow_decrease_rate))) / 126000, steps) # print(dca) return dataframe def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades # def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # # (dataframe['sma5_inv'] == 1) # ( # (dataframe['pct180'] < 0.5) | # ( # (dataframe['close'] < dataframe['sma60'] ) # & (dataframe['sma24_deriv1'] > 0) # ) # ) # # & (dataframe['hapercent'] > 0) # # & (dataframe['sma24_deriv1'] > - 0.03) # & (dataframe['ml_prob'] > 0.1) # # & ( # # (dataframe['percent3'] <= -0.003) # # | (dataframe['percent12'] <= -0.003) # # | (dataframe['percent24'] <= -0.003) # # ) # ), ['enter_long', 'enter_tag']] = (1, f"future") # # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan) # # return dataframe def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ Buy when the model predicts a high upside probability/value. This method loads the ML model, generates predictions, and triggers a buy if the predicted value exceeds a learned threshold. """ # # Ensure prediction column exists # if "ml_prediction" not in dataframe.columns: # # Generate predictions on the fly # # (your model must already be loaded in self.model) # features = self.ml_features # list of feature column names # dataframe["ml_prediction"] = self.model.predict(dataframe[features].fillna(0)) # Choose threshold automatically based on training statistics # or a fixed value discovered by SHAP / PDP # threshold = 0.4 #self.buy_threshold # ex: 0.80 or 1.10 depending on your model # 20% des signaux les plus forts threshold = np.percentile(dataframe["ml_prob"], 80) # Buy = prediction > threshold dataframe["buy"] = 0 dataframe.loc[ (dataframe["ml_prob"].shift(3) < 0.2) & (dataframe['low'].shift(3) < dataframe['min180'].shift(3)) & (dataframe['min180'].shift(3) == dataframe['min180']), ['enter_long', 'enter_tag'] ] = (1, f"future") dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan) return dataframe # def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # """ # Populate buy signals based on SHAP/PDP insights: # - strong momentum: macdhist high and macd > macdsignal # - rsi elevated (but not extreme) # - positive sma24 derivative above threshold # - price above sma60 (trend context) # - price in upper region of Bollinger (bb_percent high) # - volume/obv filter and volatility guard (obv_dist, atr) # Returns dataframe with column 'buy' (1 = buy signal). # """ # # # Ensure column existence (fallback to zeros if missing) # cols = [ # "macdhist", "macd", "macdsignal", "rsi", "rsi_short", # "sma24_deriv1", "sma60", "bb_percent", # "obv_dist", "atr", "percent", "open_1h", "absolute_min" # ] # for c in cols: # if c not in dataframe.columns: # dataframe[c] = 0.0 # # # Thresholds (tune these) # TH_MACDHIST = 8.0 # macdhist considered "strong" (example) # TH_MACD_POS = 0.0 # macd must be > 0 (positive momentum) # TH_SMA24_DERIV = 0.05 # sma24 derivative threshold where effect appears # TH_RSI_LOW = 52.0 # lower bound to consider bullish RSI # TH_RSI_HIGH = 85.0 # upper bound to avoid extreme overbought (optional) # TH_BB_PERCENT = 0.7 # in upper band (0..1) # TH_OBV_DIST = -40.0 # accept small negative OBV distance, reject very negative # MAX_ATR = None # optional: maximum ATR to avoid extreme volatility (None = off) # MIN_PRICE_ABOVE_SMA60 = 0.0 # require price > sma60 (price - sma60 > 0) # # price = dataframe["close"] # # # Momentum conditions # cond_macdhist = dataframe["macdhist"] >= TH_MACDHIST # cond_macd_pos = dataframe["macd"] > TH_MACD_POS # cond_macd_vs_signal = dataframe["macd"] > dataframe["macdsignal"] # # # RSI condition (accept moderate-high RSI) # cond_rsi = (dataframe["rsi"] >= TH_RSI_LOW) & (dataframe["rsi"] <= TH_RSI_HIGH) # # # SMA24 derivative: require momentum above threshold # cond_sma24 = dataframe["sma24_deriv1"] >= TH_SMA24_DERIV # # # Price above SMA60 (trend filter) # cond_above_sma60 = (price - dataframe["sma60"]) > MIN_PRICE_ABOVE_SMA60 # # # Bollinger band percent (price in upper region) # cond_bb = dataframe["bb_percent"] >= TH_BB_PERCENT # # # Volume/OBV prudence filter # cond_obv = dataframe["obv_dist"] >= TH_OBV_DIST # # # Optional ATR guard # if MAX_ATR is not None: # cond_atr = dataframe["atr"] <= MAX_ATR # else: # cond_atr = np.ones_like(dataframe["atr"], dtype=bool) # # # Optional additional guards (avoid tiny percent moves or weird opens) # cond_percent = np.abs(dataframe["percent"]) > 0.0005 # ignore almost-no-move bars # cond_open = True # keep as placeholder; you can add open_1h relative checks # # # Combine into a buy signal # buy_condition = ( # cond_macdhist & # cond_macd_pos & # cond_macd_vs_signal & # cond_rsi & # cond_sma24 & # cond_above_sma60 & # cond_bb & # cond_obv & # cond_atr & # cond_percent # ) # # # Finalize: set buy column (0/1) # dataframe.loc[buy_condition, ['enter_long', 'enter_tag']] = (1, f"future") # # dataframe.loc[~buy_condition, "buy"] = 0 # # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan) # # return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe # def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # # Calculer le minimum des 14 derniers jours # nb_pairs = len(self.dp.current_whitelist()) # # base_stake_amount = self.config.get('stake_amount') # # if True : #self.pairs[pair]['count_of_buys'] == 0: # factor = 1 #65 / min(65, last_candle['rsi_1d']) # # if last_candle['min_max_60'] > 0.04: # # factor = 2 # # adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor) # else: # adjusted_stake_amount = self.pairs[pair]['first_amount'] # # if self.pairs[pair]['count_of_buys'] == 0: # self.pairs[pair]['first_amount'] = adjusted_stake_amount # # return adjusted_stake_amount def approx_value(self, x, X_max): X_min = X_max * (1 - self.allow_decrease_rate) # 126198 * 0.4 = 75718,8 Y_min = 1 Y_max = 40 a = (Y_max - Y_min) / (X_max - X_min) # 39 ÷ (126198 − 126198×0,6) = 0,000772595 b = Y_min - a * X_min # 1 − (0,000772595 × 75718,8) = −38 y = a * x + b # 0,000772595 * 115000 - 38 return max(round(y), 1) # évite les valeurs négatives def adjust_stake_amount(self, pair: str, last_candle: DataFrame): if self.pairs[pair]['first_amount'] > 0: return self.pairs[pair]['first_amount'] ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle)) self.pairs[pair]['last_ath'] = ath ath_dist = 100 * (ath - last_candle["mid"]) / ath # ath_dist # 0 ==> 1 # 20 ==> 1.5 # 40 ==> 2 # 50 * (1 + (ath_dist / 40)) full = self.wallets.get_total_stake_amount() steps = self.approx_value(last_candle['mid'], ath) base_stake = full / steps # base_stake = stake * (1 + (ath_dist / 40)) # Calcule max/min 180 low180 = last_candle["min180"] high180 = last_candle["max180"] mult = 1 - ((last_candle["mid"] - low180) / (high180 - low180)) print(f"low={low180} mid={last_candle['mid']} high={high180} mult={mult} ath={ath} ath_dist={round(ath_dist, 2)}" ) # base_size = montant de base que tu veux utiliser (ex: stake_amount ou autre) base_size = base_stake # exemple fraction du portefeuille; adapte selon ton code # new stake proportionnel à mult new_stake = base_size #* mult return new_stake def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # self.printLog("skip open orders") return None dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) last_lost = self.getLastLost(last_candle, pair) pct_first = 0 total_counts = sum( pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC') if self.pairs[pair]['first_buy']: pct_first = self.getPctFirstBuy(pair, last_candle) if profit > - self.pairs[pair]['first_amount'] and count_of_buys > 15 and last_candle['sma24_deriv1_1h'] < 0: stake_amount = trade.stake_amount self.pairs[pair]['previous_profit'] = profit trade_type = "Sell " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] = stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Sell +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return -stake_amount if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 lim = 0.3 if (len(dataframe) < 1): # self.printLog("skip dataframe") return None # Dernier prix d'achat réel (pas le prix moyen) last_fill_price = self.pairs[trade.pair]['last_buy'] #trade.open_rate # remplacé juste après ↓ # if len(trade.orders) > 0: # # On cherche le dernier BUY exécuté # buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"] # if buy_orders: # last_fill_price = buy_orders[-1].price # baisse relative if minutes % 60 == 0: ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle)) self.pairs[pair]['last_ath'] = ath else: ath = self.pairs[pair]['last_ath'] # steps = self.approx_value(last_candle['mid'], ath) # dca_thresholds = split_ratio_one_third((last_candle['mid'] - (ath * self.allow_decrease_rate)) / last_candle['mid'], steps) #((last_candle['mid'] - (ath * self.allow_decrease_rate)) / steps) / last_candle['mid'] # 0.0025 + 0.0005 * count_of_buys if len(self.pairs[pair]['dca_thresholds']) == 0: self.calculateStepsDcaThresholds(last_candle, pair) dca_threshold = self.pairs[pair]['dca_thresholds'][min(count_of_buys - 1, len(self.pairs[pair]['dca_thresholds']) - 1)] # print(f"{count_of_buys} {ath * (1 - self.allow_decrease_rate)} {round(last_candle['mid'], 2)} {round((last_candle['mid'] - (ath * self.allow_decrease_rate)) / last_candle['mid'], 2)} {steps} {round(dca_threshold, 4)}") decline = (last_fill_price - current_rate) / last_fill_price increase = - decline # if decline >= self.dca_threshold: # # Exemple : on achète 50% du montant du dernier trade # last_amount = buy_orders[-1].amount if buy_orders else 0 # stake_amount = last_amount * current_rate * 0.5 # return stake_amount ########################### ALGO ATH # # --- 1. Calcul ATH local de la paire --- # ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle)) # # # --- 2. Distance ATH - current --- # dd = (current_rate - ath) / ath * 100 # dd <= 0 # # if dd > -1: # pas de renfort si drawdown trop faible # return None # # # --- 3. DCA dynamique (modèle exponentiel) --- # a = 0.015 # b = 0.12 # # pct = a * (math.exp(b * (-dd)) - 1) # proportion du wallet libre # # # Clamp de sécurité # pct = min(max(pct, 0), 0.35) # max 35% d’un coup # # if pct <= 0: # return None # # # --- 4. Stake en fonction du wallet libre --- # stake_amount = self.wallets.get_available_stake_amount() * pct # # if stake_amount < self.min_stake_amount: # return None # FIN ########################## ALGO ATH force = hours > 24 and last_candle['sma60_deriv1_1h'] > 0 condition = last_candle['percent'] > 0 and last_candle['sma24_deriv1'] > 0 \ and last_candle['close'] < self.pairs[pair]['first_buy'] # and last_candle['ml_prob'] > 0.65 limit_buy = 40 # or (last_candle['close'] <= last_candle['min180'] and hours > 3) if ((force or decline >= dca_threshold) and condition): try: if self.pairs[pair]['has_gain'] and profit > 0: self.pairs[pair]['force_sell'] = True self.pairs[pair]['previous_profit'] = profit return None stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle)) # print(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}") if stake_amount > 0: self.pairs[pair]['previous_profit'] = profit trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 " + ("Force" if force else 'Loss -'), dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # self.printLog(df_filtered) return stake_amount return None except Exception as exception: self.printLog(exception) return None if current_profit > dca_threshold and (increase >= dca_threshold and self.wallets.get_available_stake_amount() > 0)\ and last_candle['rsi'] < 75: try: self.pairs[pair]['previous_profit'] = profit stake_amount = max(20, min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))) if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type='Gain', profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None except Exception as exception: self.printLog(exception) return None return None def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_2 = dataframe.iloc[-3].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") # ----- 1) Charger les variables de trailing pour ce trade ----- max_price = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) if current_profit > 0: self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) # else: # self.pairs[pair]['max_profit'] = 0 max_profit = self.pairs[pair]['max_profit'] # if current_profit > 0: # print(f"profit={profit} max_profit={max_profit} current_profit={current_profit}") baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 # ----- 2) Mise à jour du max_price ----- self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) # ----- 3) Calcul du profit max atteint ----- # profit_max = (max_price - trade.open_rate) / trade.open_rate current_trailing_stop_positive = self.trailing_stop_positive current_trailing_only_offset_is_reached = self.trailing_only_offset_is_reached current_trailing_stop_positive_offset = self.trailing_stop_positive_offset max_ = last_candle['max180'] min_ = last_candle['min180'] mid = last_candle['mid'] # éviter division par zéro position = (mid - min_) / (max_ - min_) zone = int(position * 3) # 0 à 2 # if zone == 0: # current_trailing_stop_positive = self.trailing_stop_positive # current_trailing_stop_positive_offset = self.trailing_stop_positive_offset * 2 # if minutes > 1440: # current_trailing_only_offset_is_reached = False # current_trailing_stop_positive_offset = self.trailing_stop_positive_offset # if zone == 1: # ----- 5) Calcul du trailing stop dynamique ----- # Exemple : offset=0.321 => stop à +24.8% trailing_stop = max_profit * (1.0 - current_trailing_stop_positive) baisse = 0 if max_profit: baisse = (max_profit - profit) / max_profit # if minutes % 12 == 0: # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else " # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type=f"{round(profit, 2)} {round(max_profit, 2)} {round(trailing_stop,2)} {minutes}", # profit=round(profit, 2), # buys=count_of_buys, # stake=0 # ) if last_candle['sma12'] > last_candle['sma24']: return None # if last_candle['sma24_deriv1'] > 0 : #and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15: # if (minutes < 180): # return None # if (minutes > 1440 and last_candle['sma60_deriv1'] > 0) : # return None # ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? ----- if current_trailing_only_offset_is_reached: # Max profit pas atteint ET perte < 2 * current_trailing_stop_positive if max_profit < min(2, current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain']))\ and (max_profit > current_trailing_stop_positive_offset): #2 * current_trailing_stop_positive: return None # ne pas activer le trailing encore # Sinon : trailing actif dès le début # ----- 6) Condition de vente ----- if 0 < profit <= trailing_stop and last_candle['mid'] < last_candle['sma5']: self.pairs[pair]['force_buy'] = True return f"stop_{count_of_buys}_{self.pairs[pair]['has_gain']}" return None def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1h') for pair in pairs] informative_pairs += [(pair, '1d') for pair in pairs] return informative_pairs def populate1hIndicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # --- WEEKLY LEVELS --- # semaine précédente = semaine ISO différente df["week"] = df.index.isocalendar().week df["year"] = df.index.year df["weekly_low"] = ( df.groupby(["year", "week"])["low"] .transform("min") .shift(1) # décalé -> pas regarder la semaine en cours ) df["weekly_high"] = ( df.groupby(["year", "week"])["high"] .transform("max") .shift(1) ) # Définition simple d'une zone de demande hebdo : # bas + 25% de la bougie => modifiable df["weekly_demand_zone_low"] = df["weekly_low"] df["weekly_demand_zone_high"] = df["weekly_low"] * 1.025 # --- MONTHLY LEVELS --- df["month"] = df.index.month df["monthly_low"] = ( df.groupby(["year", "month"])["low"] .transform("min") .shift(1) # mois précédent uniquement ) df["monthly_high"] = ( df.groupby(["year", "month"])["high"] .transform("max") .shift(1) ) df["monthly_demand_zone_low"] = df["monthly_low"] df["monthly_demand_zone_high"] = df["monthly_low"] * 1.03 return df # ----- SIGNALS SIMPLES POUR EXEMPLE ----- # def populate_buy_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # df["buy"] = 0 # # # Exemple : acheter si le prix tape la zone de demande hebdomadaire # df.loc[ # (df["close"] <= df["weekly_demand_zone_high"]) & # (df["close"] >= df["weekly_demand_zone_low"]), # "buy" # ] = 1 # # return df # # def populate_sell_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # df["sell"] = 0 # # # Exemple : vendre sur retour au weekly_high précédent # df.loc[df["close"] >= df["weekly_high"], "sell"] = 1 # # return df def rsi_trend_probability(self, dataframe, short=6, long=12): dataframe = dataframe.copy() dataframe['rsi_short'] = talib.RSI(dataframe['mid'], short) dataframe['rsi_long'] = talib.RSI(dataframe['mid'], long) dataframe['cross_soft'] = np.tanh((dataframe['rsi_short'] - dataframe['rsi_long']) / 7) dataframe['gap'] = (dataframe['rsi_short'] - dataframe['rsi_long']) / 100 dataframe['trend'] = (dataframe['rsi_long'] - 50) / 50 dataframe['rtp'] = ( 0.6 * dataframe['cross_soft'] + 0.25 * dataframe['gap'] + 0.15 * dataframe['trend'] ).clip(-1, 1) return dataframe import pandas as pd def to_utc_ts(self, x): return pd.to_datetime(x, utc=True) # suppose self.btc_ath_history exists (liste de dict) def get_last_ath_before_candle(self, last_candle): candle_date = self.to_utc_ts(last_candle['date']) # ou to_utc_ts(last_candle.name) best = None for a in self.btc_ath_history: #getattr(self, "btc_ath_history", []): ath_date = self.to_utc_ts(a["date"]) if ath_date <= candle_date: if best is None or ath_date > best[0]: best = (ath_date, a["price_usd"]) return best[1] if best is not None else None def trainModel(self, dataframe: DataFrame, metadata: dict): pair = self.getShortName(metadata['pair']) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) path=self.path #f"user_data/plots/{pair}/" os.makedirs(path, exist_ok=True) # # Étape 1 : sélectionner numériques # numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # # # Étape 2 : enlever constantes # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d") # and not c.endswith("_class") and not c.endswith("_price") # and not c.startswith('stop_buying'))] # # # Étape 3 : remplacer inf et NaN par 0 # dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) # # print("Colonnes utilisables pour le modèle :") # print(usable_cols) # # self.model_indicators = usable_cols # df = dataframe[self.model_indicators].copy() # Corrélations des colonnes corr = df.corr(numeric_only=True) print("Corrélation des colonnes") print(corr) # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies # df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int) df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 100).astype(int) df['target'] = df['target'].fillna(0).astype(int) # Corrélations triées par importance avec une colonne cible target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False) print("Corrélations triées par importance avec une colonne cible") print(target_corr) # Corrélations triées par importance avec une colonne cible corr = df.corr(numeric_only=True) corr_unstacked = ( corr.unstack() .reset_index() .rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"}) ) # Supprimer les doublons col1/col2 inversés et soi-même corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]] # Trier par valeur absolue de corrélation corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index) print("Trier par valeur absolue de corrélation") print(corr_sorted.head(20)) # --- Calcul de la corrélation --- corr = df.corr(numeric_only=True) # évite les colonnes non numériques corr = corr * 100 # passage en pourcentage # --- Masque pour n’afficher que le triangle supérieur (optionnel) --- mask = np.triu(np.ones_like(corr, dtype=bool)) # --- Création de la figure --- fig, ax = plt.subplots(figsize=(96, 36)) # --- Heatmap avec un effet “température” --- sns.heatmap( corr, mask=mask, cmap="coolwarm", # palette bleu → rouge center=0, # 0 au centre annot=True, # affiche les valeurs dans chaque case fmt=".0f", # format entier (pas de décimale) cbar_kws={"label": "Corrélation (%)"}, # légende à droite linewidths=0.5, # petites lignes entre les cases ax=ax ) # --- Personnalisation --- ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20) plt.xticks(rotation=45, ha="right") plt.yticks(rotation=0) # --- Sauvegarde --- output_path = f"{self.path}/Matrice_de_correlation_temperature.png" plt.savefig(output_path, bbox_inches="tight", dpi=150) plt.close(fig) print(f"✅ Matrice enregistrée : {output_path}") # Exemple d'utilisation : selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7) print("===== 🎯 FEATURES SÉLECTIONNÉES =====") print(selected_corr) # Nettoyage df = df.dropna() X = df[self.model_indicators] y = df['target'] # ta colonne cible binaire ou numérique print("===== 🎯 FEATURES SCORES =====") print(self.feature_auc_scores(X, y)) # 4️⃣ Split train/test X = df[self.model_indicators] y = df['target'] # Séparation temporelle (train = 80 %, valid = 20 %) X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False) # Nettoyage des valeurs invalides selector = VarianceThreshold(threshold=0.0001) selector.fit(X_train) selected = X_train.columns[selector.get_support()] print("Colonnes conservées :", list(selected)) # 5️⃣ Entraînement du modèle # self.train_model = RandomForestClassifier(n_estimators=200, random_state=42) # def objective(trial): # self.train_model = XGBClassifier( # n_estimators=trial.suggest_int("n_estimators", 200, 300), # max_depth=trial.suggest_int("max_depth", 3, 6), # learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3), # subsample=trial.suggest_float("subsample", 0.7, 1.0), # colsample_bytree=trial.suggest_float("colsample_bytree", 0.7, 1.0), # scale_pos_weight=1, # tu mettras balance_ratio ici si tu veux # objective="binary:logistic", # eval_metric="logloss", # n_jobs=-1 # ) # # self.train_model.fit(X_train, y_train) # # y_pred = self.train_model.predict(X_valid) # <-- validation = test split # return f1_score(y_valid, y_pred) # # study = optuna.create_study(direction="maximize") # study.optimize(objective, n_trials=50) def objective(trial): # local_model = XGBClassifier( # n_estimators=300, # nombre d'arbres plus raisonnable # learning_rate=0.01, # un peu plus rapide que 0.006, mais stable # max_depth=4, # capture plus de patterns que 3, sans overfitting excessif # subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting # colsample_bytree=0.8, # 80% des features par arbre # gamma=0.01, # gain minimal pour un split → régularisation # reg_alpha=0.01, # L1 régularisation des feuilles # reg_lambda=1, # L2 régularisation des feuilles # n_jobs=-1, # utilise tous les cœurs CPU pour accélérer # random_state=42, # reproductibilité # missing=float('nan'), # valeur manquante reconnue # eval_metric='logloss' # métrique pour classification binaire # ) local_model = XGBClassifier( n_estimators=trial.suggest_int("n_estimators", 300, 500), max_depth=trial.suggest_int("max_depth", 1, 6), learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True), subsample=trial.suggest_float("subsample", 0.6, 1.0), colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0), scale_pos_weight=1, objective="binary:logistic", eval_metric="logloss", n_jobs=-1 ) local_model.fit( X_train, y_train, eval_set=[(X_valid, y_valid)], # early_stopping_rounds=50, verbose=False ) proba = local_model.predict_proba(X_valid)[:, 1] thresholds = np.linspace(0.1, 0.9, 50) best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds) return best_f1 study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=20) # SHAP # Reconstruction du modèle final avec les meilleurs hyperparamètres # Récupération des meilleurs paramètres trouvés best_params = study.best_params best_model = XGBClassifier(**best_params) best_model.fit(X_train, y_train) self.train_model = best_model # === SHAP plots === # Calcul SHAP explainer = shap.TreeExplainer(self.train_model) shap_values = explainer(X_train) # On choisit une observation pour le graphique waterfall # Explication du modèle de prédiction pour la première ligne de X_valid.” i = 0 # Extraction des valeurs shap_val = shap_values[i].values feature_names = X_train.columns feature_values = X_train.iloc[i] # Tri par importance absolue # order = np.argsort(np.abs(shap_val))[::-1] k = 10 order = np.argsort(np.abs(shap_val))[::-1][:k] # ---- Création figure sans l'afficher ---- plt.ioff() # Désactive l'affichage interactif shap.plots.waterfall( shap.Explanation( values=shap_val[order], base_values=shap_values.base_values[i], data=feature_values.values[order], feature_names=feature_names[order] ), show=False # IMPORTANT : n'affiche pas dans Jupyter / console ) # Sauvegarde du graphique sur disque output_path = f"{self.path}/shap_waterfall.png" plt.savefig(output_path, dpi=200, bbox_inches='tight') plt.close() # ferme la figure proprement print(f"Graphique SHAP enregistré : {output_path}") # FIN SHAP # ---- après avoir exécuté la study ------ print("Best value (F1):", study.best_value) print("Best params:", study.best_params) best_trial = study.best_trial print("\n=== BEST TRIAL ===") print("Number:", best_trial.number) print("Value:", best_trial.value) print("Params:") for k, v in best_trial.params.items(): print(f" - {k}: {v}") # All trials summary print("\n=== ALL TRIALS ===") for t in study.trials: print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}") # DataFrame of trials df = study.trials_dataframe() print(df.head()) # Graphs fig = plot_optimization_history(study) fig.write_html(f"{self.path}/optimization_history.html") fig = plot_param_importances(study) fig.write_html(f"{self.path}/param_importances.html") fig = plot_slice(study) fig.write_html(f"{self.path}/slice.html") fig = plot_parallel_coordinate(study) fig.write_html(f"{self.path}/parallel_coordinates.html") # 2️⃣ Sélection des features AVANT calibration sfm = SelectFromModel(self.train_model, threshold="median", prefit=True) selected_features = X_train.columns[sfm.get_support()] print(selected_features) # 3️⃣ Calibration ensuite (facultative) calibrated = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5) calibrated.fit(X_train[selected_features], y_train) print(calibrated) # # # calibration # self.train_model = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5) # # Sélection # sfm = SelectFromModel(self.train_model, threshold="median") # sfm.fit(X_train, y_train) # selected_features = X_train.columns[sfm.get_support()] # print(selected_features) # self.train_model.fit(X_train, y_train) y_pred = self.train_model.predict(X_valid) y_proba = self.train_model.predict_proba(X_valid)[:, 1] # print(classification_report(y_valid, y_pred)) # print(confusion_matrix(y_valid, y_pred)) print("\nRapport de classification :\n", classification_report(y_valid, y_pred)) print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred)) # # Importances # importances = pd.DataFrame({ # "feature": self.train_model.feature_name_, # "importance": self.train_model.feature_importances_ # }).sort_values("importance", ascending=False) # print("\n===== 🔍 IMPORTANCE DES FEATURES =====") # print(importances) # Feature importance importances = self.train_model.feature_importances_ feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False) # Affichage feat_imp.plot(kind='bar', figsize=(12, 6)) plt.title("Feature importances") # plt.show() plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight') result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10, random_state=42) perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False) perm_imp.plot(kind='bar', figsize=(12, 6)) plt.title("Permutation feature importance") # plt.show() plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight') # Shap explainer = shap.TreeExplainer(self.train_model) shap_values = explainer.shap_values(X_valid) # Résumé global shap.summary_plot(shap_values, X_valid) # Force plot pour une observation force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :]) shap.save_html(f"{self.path}/shap_force_plot.html", force_plot) fig, ax = plt.subplots(figsize=(24, 48)) PartialDependenceDisplay.from_estimator( self.train_model, X_valid, selected_features, kind="average", ax=ax ) fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight") plt.close(fig) best_f1 = 0 best_t = 0.5 for t in [0.3, 0.4, 0.5, 0.6, 0.7]: y_pred_thresh = (y_proba > t).astype(int) score = f1_score(y_valid, y_pred_thresh) print(f"Seuil {t:.1f} → F1: {score:.3f}") if score > best_f1: best_f1 = score best_t = t print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}") # 6️⃣ Évaluer la précision (facultatif) preds = self.train_model.predict(X_valid) acc = accuracy_score(y_valid, preds) print(f"Accuracy: {acc:.3f}") # 7️⃣ Sauvegarde du modèle joblib.dump(self.train_model, f"{self.path}/{pair}_rf_model.pkl") print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl") # X = dataframe des features (après shift/rolling/indicators) # y = target binaire ou décimale # model = ton modèle entraîné (RandomForestClassifier ou Regressor) # # --- 1️⃣ Mutual Information (MI) --- # mi_scores = mutual_info_classif(X.fillna(0), y) # mi_series = pd.Series(mi_scores, index=X.columns, name='MI') # # # --- 2️⃣ Permutation Importance (PI) --- # pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1) # pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI') # # # --- 3️⃣ Combinaison dans un seul dataframe --- # importance_df = pd.concat([mi_series, pi_series], axis=1) # importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle # print(importance_df) # # importance_df.plot(kind='bar', figsize=(10, 5)) # plt.title("Mutual Info vs Permutation Importance") # plt.ylabel("Score") # plt.show() self.analyze_model(pair, self.train_model, X_train, X_valid, y_train, y_valid) def inspect_model(self, model): """ Affiche les informations d'un modèle ML déjà entraîné. Compatible avec scikit-learn, xgboost, lightgbm, catboost... """ print("===== 🔍 INFORMATIONS DU MODÈLE =====") # Type de modèle print(f"Type : {type(model).__name__}") print(f"Module : {model.__class__.__module__}") # Hyperparamètres if hasattr(model, "get_params"): params = model.get_params() print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====") for k, v in params.items(): print(f"{k}: {v}") # Nombre d’estimateurs if hasattr(model, "n_estimators"): print(f"\nNombre d’estimateurs : {model.n_estimators}") # Importance des features if hasattr(model, "feature_importances_"): print("\n===== 📊 IMPORTANCE DES FEATURES =====") # Correction ici : feature_names = getattr(model, "feature_names_in_", None) if isinstance(feature_names, np.ndarray): feature_names = feature_names.tolist() elif feature_names is None: feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))] fi = pd.DataFrame({ "feature": feature_names, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(fi) # Coefficients (modèles linéaires) if hasattr(model, "coef_"): print("\n===== ➗ COEFFICIENTS =====") coef = np.array(model.coef_) if coef.ndim == 1: for i, c in enumerate(coef): print(f"Feature {i}: {c:.6f}") else: print(coef) # Intercept if hasattr(model, "intercept_"): print("\nIntercept :", model.intercept_) # Classes connues if hasattr(model, "classes_"): print("\n===== 🎯 CLASSES =====") print(model.classes_) # Scores internes for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]: if hasattr(model, attr): print(f"\n{attr} = {getattr(model, attr)}") # Méthodes disponibles print("\n===== 🧩 MÉTHODES DISPONIBLES =====") methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)] print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else "")) print("\n===== ✅ FIN DE L’INSPECTION =====") def analyze_model(self, pair, model, X_train, X_valid, y_train, y_valid): """ Analyse complète d'un modèle ML supervisé (classification binaire). Affiche performances, importance des features, matrices, seuils, etc. """ os.makedirs(self.path, exist_ok=True) # ---- Prédictions ---- preds = model.predict(X_valid) probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds # ---- Performances globales ---- print("===== 📊 ÉVALUATION DU MODÈLE =====") print("Colonnes du modèle :", model.feature_names_in_) print("Colonnes X_valid :", list(X_valid.columns)) print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}") print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}") print("TN (True Negative) / FP (False Positive)") print("FN (False Negative) / TP (True Positive)") print("\nRapport de classification :\n", classification_report(y_valid, preds)) # | Élément | Valeur | Signification | # | ------------------- | ------ | ----------------------------------------------------------- | # | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas d’achat) | # | FP (False Positive) | 43 | Modèle a prédit 1 alors que c’était 0 (faux signal d’achat) | # | FN (False Negative) | 108 | Modèle a prédit 0 alors que c’était 1 (manqué un achat) | # | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal d’achat) | # ---- Matrice de confusion ---- cm = confusion_matrix(y_valid, preds) print("Matrice de confusion :\n", cm) plt.figure(figsize=(4, 4)) plt.imshow(cm, cmap="Blues") plt.title("Matrice de confusion") plt.xlabel("Prédit") plt.ylabel("Réel") for i in range(2): for j in range(2): plt.text(j, i, cm[i, j], ha="center", va="center", color="black") # plt.show() plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight") plt.close() # ---- Importance des features ---- if hasattr(model, "feature_importances_"): print("\n===== 🔍 IMPORTANCE DES FEATURES =====") importance = pd.DataFrame({ "feature": X_train.columns, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(importance) # Crée une figure plus grande fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces # Trace le bar plot sur cet axe importance.plot.bar(x="feature", y="importance", legend=False, ax=ax) # Tourner les labels pour plus de lisibilité ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right') plt.title("Importance des features") # plt.show() plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight") plt.close() # ---- Arbre de décision (extrait) ---- if hasattr(model, "estimators_"): print("\n===== 🌳 EXTRAIT D’UN ARBRE =====") print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800]) # ---- Précision selon le seuil ---- thresholds = np.linspace(0.1, 0.9, 9) print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====") for t in thresholds: preds_t = (probs > t).astype(int) acc = accuracy_score(y_valid, preds_t) print(f"Seuil {t:.1f} → précision {acc:.3f}") # ---- ROC Curve ---- fpr, tpr, _ = roc_curve(y_valid, probs) plt.figure(figsize=(5, 4)) plt.plot(fpr, tpr, label="ROC curve") plt.plot([0, 1], [0, 1], linestyle="--", color="gray") plt.xlabel("Taux de faux positifs") plt.ylabel("Taux de vrais positifs") plt.title("Courbe ROC") plt.legend() # plt.show() plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight") plt.close() # # ---- Interprétation SHAP (optionnelle) ---- # try: # import shap # # print("\n===== 💡 ANALYSE SHAP =====") # explainer = shap.TreeExplainer(model) # shap_values = explainer.shap_values(X_valid) # # shap.summary_plot(shap_values[1], X_valid) # # Vérifie le type de sortie de shap_values # if isinstance(shap_values, list): # # Cas des modèles de classification (plusieurs classes) # shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1] # else: # shap_values_to_plot = shap_values # # # Ajustement des dimensions au besoin # if shap_values_to_plot.shape[1] != X_valid.shape[1]: # print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})") # min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1]) # shap_values_to_plot = shap_values_to_plot[:, :min_dim] # X_to_plot = X_valid.iloc[:, :min_dim] # else: # X_to_plot = X_valid # # plt.figure(figsize=(12, 4)) # shap.summary_plot(shap_values_to_plot, X_to_plot, show=False) # plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight") # plt.close() # except ImportError: # print("\n(SHAP non installé — `pip install shap` pour activer l’analyse SHAP.)") y_proba = model.predict_proba(X_valid)[:, 1] # Trace ou enregistre le graphique self.plot_threshold_analysis(y_valid, y_proba, step=0.05, save_path=f"{self.path}/threshold_analysis.png") # y_valid : vraies classes (0 / 1) # y_proba : probabilités de la classe 1 prédites par ton modèle # Exemple : y_proba = model.predict_proba(X_valid)[:, 1] seuils = np.arange(0.0, 1.01, 0.05) precisions, recalls, f1s = [], [], [] for seuil in seuils: y_pred = (y_proba >= seuil).astype(int) precisions.append(precision_score(y_valid, y_pred)) recalls.append(recall_score(y_valid, y_pred)) f1s.append(f1_score(y_valid, y_pred)) plt.figure(figsize=(10, 6)) plt.plot(seuils, precisions, label='Précision', marker='o') plt.plot(seuils, recalls, label='Rappel', marker='o') plt.plot(seuils, f1s, label='F1-score', marker='o') # Ajoute un point pour le meilleur F1 best_idx = np.argmax(f1s) plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f'Max F1 ({seuils[best_idx]:.2f})') plt.title("Performance du modèle selon le seuil de probabilité") plt.xlabel("Seuil de probabilité (classe 1)") plt.ylabel("Score") plt.grid(True, alpha=0.3) plt.legend() plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight') # plt.show() print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}") print("\n===== ✅ FIN DE L’ANALYSE =====") def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None): """ Affiche la précision, le rappel et le F1-score selon le seuil de décision. y_true : labels réels (0 ou 1) y_proba : probabilités prédites (P(hausse)) step : pas entre les seuils testés save_path : si renseigné, enregistre l'image au lieu d'afficher """ # Le graphique généré affichera trois courbes : # 🔵 Precision — la fiabilité de tes signaux haussiers. # 🟢 Recall — la proportion de hausses que ton modèle détecte. # 🟣 F1-score — le compromis optimal entre les deux. thresholds = np.arange(0, 1.01, step) precisions, recalls, f1s = [], [], [] for thr in thresholds: preds = (y_proba >= thr).astype(int) precisions.append(precision_score(y_true, preds)) recalls.append(recall_score(y_true, preds)) f1s.append(f1_score(y_true, preds)) plt.figure(figsize=(10, 6)) plt.plot(thresholds, precisions, label="Precision", linewidth=2) plt.plot(thresholds, recalls, label="Recall", linewidth=2) plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--") plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5") plt.title("📊 Performance selon le seuil de probabilité", fontsize=14) plt.xlabel("Seuil de décision (threshold)") plt.ylabel("Score") plt.legend() plt.grid(True, alpha=0.3) if save_path: plt.savefig(save_path, bbox_inches='tight') print(f"✅ Graphique enregistré : {save_path}") else: plt.show() def feature_auc_scores(self, X, y): aucs = {} for col in X.columns: try: aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0)) except Exception: aucs[col] = np.nan return pd.Series(aucs).sort_values(ascending=False) def listUsableColumns(self, dataframe): # Étape 1 : sélectionner numériques numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # Étape 2 : enlever constantes usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 and not c.endswith("_state") and not c.endswith("_1d") # and not c.endswith("_1h") and not c.startswith("open") and not c.startswith("close") and not c.startswith("low") and not c.startswith("high") and not c.startswith("haopen") and not c.startswith("haclose") # and not c.startswith("bb_lower") and not c.startswith("bb_upper") # and not c.startswith("bb_middle") and not c.endswith("_count") and not c.endswith("_class") and not c.endswith("_price") and not c.startswith('stop_buying') and not c.startswith('target') and not c.startswith('lvl') ] # Étape 3 : remplacer inf et NaN par 0 dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) print("Colonnes utilisables pour le modèle :") print(usable_cols) # self.model_indicators = usable_cols return usable_cols def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7): """ Sélectionne les features les plus corrélées avec target, tout en supprimant celles trop corrélées entre elles. """ # 1️⃣ Calcul des corrélations absolues avec la cible corr = df.corr(numeric_only=True) corr_target = corr[target].abs().sort_values(ascending=False) # 2️⃣ Prend les N features les plus corrélées avec la cible (hors target) features = corr_target.drop(target).head(top_n).index.tolist() # 3️⃣ Évite les features trop corrélées entre elles selected = [] for feat in features: too_correlated = False for sel in selected: if abs(corr.loc[feat, sel]) > corr_threshold: too_correlated = True break if not too_correlated: selected.append(feat) # 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation selected_corr = pd.DataFrame({ "feature": selected, "corr_with_target": [corr.loc[f, target] for f in selected] }).sort_values(by="corr_with_target", key=np.abs, ascending=False) return selected_corr def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'): dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean() dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", ema_period=window) return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" # d1s_col = f"{name}{suffixe}_deriv1_smooth" # d2s_col = f"{name}{suffixe}_deriv2_smooth" tendency_col = f"{name}{suffixe}_state" factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \ & (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"]) # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"] # dérivée relative simple dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) # lissage EMA dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean() # dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median() dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1) dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean() # epsilon adaptatif via rolling percentile p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95) p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95) eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef # fallback global eps global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1) eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2) # if verbose and self.dp.runmode.value in ('backtest'): # stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T # stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0) # print(f"---- Derivatives stats {timeframe}----") # print(stats) # print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}") # print("---------------------------") # mapping tendency def tag_by_derivatives(row): idx = int(row.name) d1v = float(row[d1_col]) d2v = float(row[d2_col]) eps1 = float(eps_d1_series.iloc[idx]) eps2 = float(eps_d2_series.iloc[idx]) # # mapping état → codes 3 lettres explicites # # | Ancien état | Nouveau code 3 lettres | Interprétation | # # | ----------- | ---------------------- | --------------------- | # # | 4 | HAU | Hausse Accélérée | # # | 3 | HSR | Hausse Ralentissement | # # | 2 | HST | Hausse Stable | # # | 1 | DHB | Départ Hausse | # # | 0 | PAL | Palier / neutre | # # | -1 | DBD | Départ Baisse | # # | -2 | BSR | Baisse Ralentissement | # # | -3 | BST | Baisse Stable | # # | -4 | BAS | Baisse Accélérée | # Palier strict if abs(d1v) <= eps1 and abs(d2v) <= eps2: return 0 # Départ si d1 ~ 0 mais d2 signale direction if abs(d1v) <= eps1: return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0 # Hausse if d1v > eps1: return 4 if d2v > eps2 else 3 # Baisse if d1v < -eps1: return -4 if d2v < -eps2 else -2 return 0 dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1) # if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'): # print("##################") # print(f"# STAT {timeframe} {name}{suffixe}") # print("##################") # self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2") return dataframe def calculModelInformative(self, informative): # préparation # print(df) df = informative.copy() X = df[self.listUsableColumns(df)] df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 0).astype(int) df['target'] = df['target'].fillna(0).astype(int) y = df['target'] # train/test X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=False, test_size=0.2) # Pipeline normalisé + Logistic Regresson clf = Pipeline([ ("scaler", StandardScaler()), ("logreg", LogisticRegression(max_iter=5000)) ]) # Calibration CV automatique cal = CalibratedClassifierCV(clf, cv=3, method="isotonic") # Entraînement cal.fit(X_train, y_train) # Probabilités calibrées probas = cal.predict_proba(X_test)[:, 1] # Injection propre des probabilités dans le dataframe original (aux bons index) df.loc[X_test.index, 'ml_prob'] = probas print("Brier score:", brier_score_loss(y_test, probas)) print("ROC AUC:", roc_auc_score(y_test, probas)) # joindre probabilités au df (dernières lignes correspondantes) return probas def prune_features(self, model, dataframe, feature_columns, importance_threshold=0.01): """ Supprime les features dont l'importance est inférieure au seuil. Args: model: XGBClassifier déjà entraîné dataframe: DataFrame contenant toutes les features feature_columns: liste des colonnes/features utilisées pour la prédiction importance_threshold: seuil minimal pour conserver une feature (en proportion de l'importance totale) Returns: dataframe_pruned: dataframe avec uniquement les features conservées kept_features: liste des features conservées """ booster = model.get_booster() # Récupérer importance des features selon 'gain' importance = booster.get_score(importance_type='gain') # Normaliser pour que la somme soit 1 total_gain = sum(importance.values()) normalized_importance = {k: v / total_gain for k, v in importance.items()} # Features à garder kept_features = [f for f in feature_columns if normalized_importance.get(f, 0) >= importance_threshold] dataframe_pruned = dataframe[kept_features].fillna(0) print(f"⚡ Features conservées ({len(kept_features)} / {len(feature_columns)}): {kept_features}") return dataframe_pruned, kept_features