diff --git a/Frictrade.py b/Frictrade.py
index d59a03e..bb2b5ec 100644
--- a/Frictrade.py
+++ b/Frictrade.py
@@ -511,15 +511,14 @@ class Frictrade(IStrategy):
dataframe.loc[
(
# (dataframe['sma5_inv'] == 1)
- (
- (dataframe['pct180'] < 0.5) |
- (
- (dataframe['close'] < dataframe['sma60'] )
- & (dataframe['sma24_deriv1'] > 0)
- )
- )
- & (dataframe['hapercent'] > 0)
- & (dataframe['sma24_deriv1'] > - 0.03)
+ # (
+ # ((dataframe['pct180'] < 0.5) | (dataframe['sma24_deriv1'] > 0))
+ # |((dataframe['close'] < dataframe['sma24'] ) & (dataframe['sma24_deriv1'] > 0))
+ #
+ # )
+ (dataframe['hapercent'] > 0)
+ & (dataframe['rsi'] < 85)
+ & (dataframe['sma24'] > dataframe['sma60'])
# & (
# (dataframe['percent3'] <= -0.003)
# | (dataframe['percent12'] <= -0.003)
@@ -578,7 +577,7 @@ class Frictrade(IStrategy):
# base_size = montant de base que tu veux utiliser (ex: stake_amount ou autre)
base_size = base_stake # exemple fraction du portefeuille; adapte selon ton code
# new stake proportionnel à mult
- new_stake = base_size * mult
+ new_stake = base_size #* mult
return new_stake
def adjust_trade_position(self, trade: Trade, current_time: datetime,
@@ -817,11 +816,11 @@ class Frictrade(IStrategy):
stake=0
)
- if last_candle['sma24_deriv1'] > 0 : #and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15:
- if (minutes < 180):
- return None
- if (minutes > 1440 and last_candle['sma60_deriv1'] > 0) :
- return None
+ # if last_candle['sma24_deriv1'] > 0 : #and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15:
+ # if (minutes < 180):
+ # return None
+ # if (minutes > 1440 and last_candle['sma60_deriv1'] > 0) :
+ # return None
# ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? -----
diff --git a/FrictradeLearning.py b/FrictradeLearning.py
new file mode 100644
index 0000000..3a39bb5
--- /dev/null
+++ b/FrictradeLearning.py
@@ -0,0 +1,2051 @@
+# Zeus Strategy: First Generation of GodStra Strategy with maximum
+# AVG/MID profit in USDT
+# Author: @Mablue (Masoud Azizi)
+# github: https://github.com/mablue/
+# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
+# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
+# --- Do not remove these libs ---
+from datetime import timedelta, datetime
+from freqtrade.persistence import Trade
+from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
+ IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
+import pandas as pd
+import numpy as np
+import os
+import json
+import csv
+from pandas import DataFrame
+from typing import Optional, Union, Tuple
+import math
+import logging
+from pathlib import Path
+
+# --------------------------------
+
+# Add your lib to import here test git
+import ta
+import talib.abstract as talib
+import freqtrade.vendor.qtpylib.indicators as qtpylib
+from datetime import timezone, timedelta
+
+# Machine Learning
+from sklearn.ensemble import RandomForestClassifier,RandomForestRegressor
+from sklearn.model_selection import train_test_split
+from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
+from sklearn.metrics import accuracy_score
+import joblib
+import matplotlib.pyplot as plt
+from sklearn.metrics import (
+ classification_report,
+ confusion_matrix,
+ accuracy_score,
+ roc_auc_score,
+ roc_curve,
+ precision_score, recall_score, precision_recall_curve,
+ f1_score
+)
+from sklearn.tree import export_text
+import inspect
+from sklearn.feature_selection import mutual_info_classif
+from sklearn.inspection import permutation_importance
+from lightgbm import LGBMClassifier
+from sklearn.calibration import CalibratedClassifierCV
+from sklearn.feature_selection import SelectFromModel
+from tabulate import tabulate
+from sklearn.model_selection import GridSearchCV
+from sklearn.feature_selection import VarianceThreshold
+import seaborn as sns
+from xgboost import XGBClassifier
+import optuna
+from optuna.visualization import plot_optimization_history
+from optuna.visualization import plot_slice
+from optuna.visualization import plot_param_importances
+from optuna.visualization import plot_parallel_coordinate
+import shap
+from sklearn.inspection import PartialDependenceDisplay
+
+from sklearn.model_selection import train_test_split
+from sklearn.metrics import f1_score
+from xgboost import XGBClassifier
+
+logger = logging.getLogger(__name__)
+
+# Couleurs ANSI de base
+RED = "\033[31m"
+GREEN = "\033[32m"
+YELLOW = "\033[33m"
+BLUE = "\033[34m"
+MAGENTA = "\033[35m"
+CYAN = "\033[36m"
+RESET = "\033[0m"
+
+
+class FrictradeLearning(IStrategy):
+ startup_candle_count = 180
+
+ model_indicators = []
+ DEFAULT_PARAMS = {
+ "rsi_buy": 30,
+ "rsi_sell": 70,
+ "ema_period": 21,
+ "sma_short": 20,
+ "sma_long": 100,
+ "atr_period": 14,
+ "atr_multiplier": 1.5,
+ "stake_amount": None, # use exchange default
+ "stoploss": -0.10,
+ "minimal_roi": {"0": 0.10}
+ }
+
+ # ROI table:
+ minimal_roi = {
+ "0": 10
+ }
+
+ # Stoploss:
+ stoploss = -1 # 0.256
+ # Custom stoploss
+ use_custom_stoploss = False
+
+ trailing_stop = False
+ trailing_stop_positive = 0.15
+ trailing_stop_positive_offset = 0.5
+ trailing_only_offset_is_reached = True
+
+ # Buy hypers
+ timeframe = '1m'
+ max_open_trades = 5
+ max_amount = 40
+
+ parameters = {}
+ # DCA config
+ position_adjustment_enable = True
+
+ columns_logged = False
+ pairs = {
+ pair: {
+ "first_buy": 0,
+ "last_buy": 0.0,
+ "last_min": 999999999999999.5,
+ "last_max": 0,
+ "trade_info": {},
+ "max_touch": 0.0,
+ "last_sell": 0.0,
+ 'count_of_buys': 0,
+ 'current_profit': 0,
+ 'expected_profit': 0,
+ 'previous_profit': 0,
+ "last_candle": {},
+ "last_count_of_buys": 0,
+ 'base_stake_amount': 0,
+ 'stop_buy': False,
+ 'last_date': 0,
+ 'stop': False,
+ 'max_profit': 0,
+ 'first_amount': 0,
+ 'total_amount': 0,
+ 'has_gain': 0,
+ 'force_sell': False,
+ 'force_buy': False
+ }
+ for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
+ "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
+ }
+ trades = list()
+ max_profit_pairs = {}
+
+ btc_ath_history = [
+ {"date": "2011-06-09", "price_usd": 26.15, "note": "pic 2011 (early breakout)"},
+ {"date": "2013-11-29", "price_usd": 1132.00, "note": "bull run fin 2013"},
+ {"date": "2017-12-17", "price_usd": 19783.00, "note": "ATH décembre 2017 (crypto bubble)"},
+ {"date": "2020-12-31", "price_usd": 29001.72, "note": "fin 2020, nouveau record après accumulation)"},
+ {"date": "2021-11-10", "price_usd": 68742.00, "note": "record novembre 2021 (institutional demand)"},
+ {"date": "2024-03-05", "price_usd": 69000.00,
+ "note": "nouveau pic début 2024 (source presse, valeur indicative)"},
+ {"date": "2025-07-11", "price_usd": 118755.00, "note": "pic juillet 2025 (valeur rapportée par la presse)"},
+ {"date": "2025-10-06", "price_usd": 126198.07,
+ "note": "pic oct. 2025 (source agrégée, à vérifier selon l'exchange)"}
+ ]
+
+ def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
+ current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
+
+ minutes = 0
+ if self.pairs[pair]['last_date'] != 0:
+ minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
+
+ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+ last_candle_2 = dataframe.iloc[-2].squeeze()
+ last_candle_3 = dataframe.iloc[-3].squeeze()
+
+ condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
+
+ allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
+
+ if allow_to_buy:
+ self.trades = list()
+ self.pairs[pair]['first_buy'] = rate
+ self.pairs[pair]['last_buy'] = rate
+ self.pairs[pair]['max_touch'] = last_candle['close']
+ self.pairs[pair]['last_candle'] = last_candle
+ self.pairs[pair]['count_of_buys'] = 1
+ self.pairs[pair]['current_profit'] = 0
+ self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
+ self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
+
+
+ dispo = round(self.wallets.get_available_stake_amount())
+ self.printLineLog()
+
+ stake_amount = self.adjust_stake_amount(pair, last_candle)
+
+ self.pairs[pair]['total_amount'] = stake_amount
+ self.pairs[pair]['first_amount'] = stake_amount
+
+ self.log_trade(
+ last_candle=last_candle,
+ date=current_time,
+ action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
+ pair=pair,
+ rate=rate,
+ dispo=dispo,
+ profit=0,
+ trade_type=entry_tag,
+ buys=1,
+ stake=round(stake_amount, 2)
+ )
+
+ return allow_to_buy
+
+ def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
+ time_in_force: str,
+ exit_reason: str, current_time, **kwargs, ) -> bool:
+
+ # allow_to_sell = (minutes > 30)
+ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+
+ minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0))
+ profit =trade.calc_profit(rate)
+ force = self.pairs[pair]['force_sell']
+ allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss')
+
+ if allow_to_sell:
+ self.trades = list()
+ self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys']
+ self.pairs[pair]['last_sell'] = rate
+ self.pairs[pair]['last_candle'] = last_candle
+ self.pairs[pair]['previous_profit'] = 0
+ self.trades = list()
+ dispo = round(self.wallets.get_available_stake_amount())
+ # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
+ self.log_trade(
+ last_candle=last_candle,
+ date=current_time,
+ action="🟥Sell " + str(minutes),
+ pair=pair,
+ trade_type=exit_reason,
+ rate=last_candle['close'],
+ dispo=dispo,
+ profit=round(profit, 2)
+ )
+ self.pairs[pair]['max_profit'] = 0
+ self.pairs[pair]['force_sell'] = False
+ self.pairs[pair]['has_gain'] = 0
+ self.pairs[pair]['current_profit'] = 0
+ self.pairs[pair]['total_amount'] = 0
+ self.pairs[pair]['count_of_buys'] = 0
+ self.pairs[pair]['max_touch'] = 0
+ self.pairs[pair]['last_buy'] = 0
+ self.pairs[pair]['last_date'] = current_time
+ self.pairs[pair]['current_trade'] = None
+ # else:
+ # self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked")
+ return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss')
+
+ # def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
+ #
+ # dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ # last_candle = dataframe.iloc[-1].squeeze()
+ # last_candle_1h = dataframe.iloc[-13].squeeze()
+ # before_last_candle = dataframe.iloc[-2].squeeze()
+ # before_last_candle_2 = dataframe.iloc[-3].squeeze()
+ # before_last_candle_12 = dataframe.iloc[-13].squeeze()
+ #
+ # expected_profit = self.expectedProfit(pair, last_candle)
+ # # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
+ #
+ # max_touch_before = self.pairs[pair]['max_touch']
+ # self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
+ # self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
+ # self.pairs[pair]['current_trade'] = trade
+ #
+ # count_of_buys = trade.nr_of_successful_entries
+ #
+ # profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
+ # self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
+ # max_profit = last_candle['max5'] #self.pairs[pair]['max_profit']
+ # baisse = 0
+ # if profit > 0:
+ # baisse = 1 - (profit / max_profit)
+ # mx = max_profit / 5
+ # self.pairs[pair]['count_of_buys'] = count_of_buys
+ # self.pairs[pair]['current_profit'] = profit
+ #
+ # dispo = round(self.wallets.get_available_stake_amount())
+ # hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
+ # days_since_first_buy = (current_time - trade.open_date_utc).days
+ # hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
+ # minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
+ #
+ # if minutes % 4 == 0:
+ # self.log_trade(
+ # last_candle=last_candle,
+ # date=current_time,
+ # action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
+ # dispo=dispo,
+ # pair=pair,
+ # rate=last_candle['close'],
+ # trade_type='',
+ # profit=round(profit, 2),
+ # buys=count_of_buys,
+ # stake=0
+ # )
+ #
+ # if (last_candle['close'] > last_candle['mid']) or (last_candle['sma5_deriv1'] > 0):
+ # return None
+ #
+ # pair_name = self.getShortName(pair)
+ #
+ # if profit > 0.003 * count_of_buys and baisse > 0.30:
+ # self.pairs[pair]['force_sell'] = False
+ # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
+ # return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
+ #
+ # self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
+
+ def getShortName(self, pair):
+ return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '')
+
+ def getLastLost(self, last_candle, pair):
+ last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
+ return last_lost
+ def getPctFirstBuy(self, pair, last_candle):
+ return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
+
+ def getPctLastBuy(self, pair, last_candle):
+ return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
+
+ def expectedProfit(self, pair: str, last_candle: DataFrame):
+ lim = 0.01
+ pct = 0.002
+ if (self.getShortName(pair) == 'BTC'):
+ lim = 0.005
+ pct = 0.001
+ pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
+ expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
+
+ self.pairs[pair]['expected_profit'] = expected_profit
+
+ return expected_profit
+
+ def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
+ last_candle=None):
+ # Afficher les colonnes une seule fois
+ if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
+ return
+ if self.columns_logged % 10 == 0:
+ self.printLog(
+ f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
+ f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_min':>7}|{'Buys':>5}| {'Stake':>5} |"
+ f"{'rsi':>6}" #|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
+ )
+ self.printLineLog()
+ df = pd.DataFrame.from_dict(self.pairs, orient='index')
+ colonnes_a_exclure = ['last_candle',
+ 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
+ df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
+ # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
+
+ self.printLog(df_filtered)
+
+ self.columns_logged += 1
+ date = str(date)[:16] if date else "-"
+ limit = None
+ rsi = ''
+ rsi_pct = ''
+ sma5_1d = ''
+ sma5_1h = ''
+
+ sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
+
+ last_lost = self.getLastLost(last_candle, pair)
+
+ if buys is None:
+ buys = ''
+
+ max_touch = ''
+ pct_max = self.getPctFirstBuy(pair, last_candle)
+
+ total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
+
+ dist_max = ''
+
+ last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
+ self.pairs[pair]['last_max'], 3)
+ last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
+ self.pairs[pair]['last_min'], 3)
+
+ color = GREEN if profit > 0 else RED
+
+ profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
+
+ # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère.
+ # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
+ # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère.
+ # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
+ self.printLog(
+ f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
+ f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
+ f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
+ f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|{round(last_candle['rsi_1h'], 1) or '-' :>6}|{round(last_candle['rsi_1d'], 1) or '-' :>6}|"
+ f"{round(last_candle['rtp_1h'] * 100, 0) or '-' :>6}|{round(last_candle['rtp_1d'] * 100, 0) or '-' :>6}|"
+
+ )
+
+ def printLineLog(self):
+ # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|"
+ self.printLog(
+ f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
+ f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
+ )
+
+ def printLog(self, str):
+ if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
+ return;
+ if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
+ logger.info(str)
+ else:
+ if not self.dp.runmode.value in ('hyperopt'):
+ print(str)
+
+ def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
+ # Add all ta features
+ pair = metadata['pair']
+ short_pair = self.getShortName(pair)
+ self.path = f"user_data/plots/{short_pair}/" + ("valide/" if not self.dp.runmode.value in ('backtest') else '')
+
+ heikinashi = qtpylib.heikinashi(dataframe)
+ dataframe['haopen'] = heikinashi['open']
+ dataframe['haclose'] = heikinashi['close']
+ dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
+
+ dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2
+ dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean()
+ dataframe['sma5_deriv1'] = 1000 * (dataframe['sma5'] - dataframe['sma5'].shift(1)) / dataframe['sma5'].shift(1)
+
+ dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean()
+ dataframe['sma24_deriv1'] = 1000 * (dataframe['sma24'] - dataframe['sma24'].shift(1)) / dataframe['sma24'].shift(1)
+
+ dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean()
+ dataframe['sma60_deriv1'] = 1000 * (dataframe['sma60'] - dataframe['sma60'].shift(1)) / dataframe['sma60'].shift(1)
+
+ # dataframe[f"sma5_inv"] = (dataframe[f"sma5"].shift(2) >= dataframe[f"sma5"].shift(1)) \
+ # & (dataframe[f"sma5"].shift(1) <= dataframe[f"sma5"])
+
+ dataframe["sma5_sqrt"] = (
+ np.sqrt(np.abs(dataframe["sma5"] - dataframe["sma5"].shift(1)))
+ + np.sqrt(np.abs(dataframe["sma5"].shift(3) - dataframe["sma5"].shift(1)))
+ )
+ dataframe["sma5_inv"] = (
+ (dataframe["sma5"].shift(2) >= dataframe["sma5"].shift(1))
+ & (dataframe["sma5"].shift(1) <= dataframe["sma5"])
+ & (dataframe["sma5_sqrt"] > 5)
+ )
+
+ dataframe["percent"] = dataframe['mid'].pct_change()
+ dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean()
+ dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean()
+ dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean()
+
+ dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14)
+ self.calculeDerivees(dataframe, 'rsi', ema_period=12)
+ dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
+ dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
+ dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5)
+ dataframe['min180'] = talib.MIN(dataframe['mid'], timeperiod=180)
+ dataframe['max180'] = talib.MAX(dataframe['mid'], timeperiod=180)
+ dataframe['pct180'] = ((dataframe["mid"] - dataframe['min180'] ) / (dataframe['max180'] - dataframe['min180'] ))
+
+ dataframe = self.rsi_trend_probability(dataframe, short=60, long=360)
+
+ # ################### INFORMATIVE 1h
+ informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
+ informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
+ # informative = self.populate1hIndicators(df=informative, metadata=metadata)
+ informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14)
+ self.calculeDerivees(informative, 'rsi', ema_period=12)
+ informative = self.rsi_trend_probability(informative)
+ # informative = self.calculateRegression(informative, 'mid', lookback=15)
+ dataframe = merge_informative_pair(dataframe, informative, '1m', '1h', ffill=True)
+
+ # ################### INFORMATIVE 1d
+ informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1d')
+ informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
+ informative['rsi'] = talib.RSI(informative['mid'], timeperiod=5)
+ informative = self.rsi_trend_probability(informative)
+ # informative = self.calculateRegression(informative, 'mid', lookback=15)
+ dataframe = merge_informative_pair(dataframe, informative, '1m', '1d', ffill=True)
+
+ dataframe['last_price'] = dataframe['close']
+ dataframe['first_price'] = dataframe['close']
+ if self.dp:
+ if self.dp.runmode.value in ('live', 'dry_run'):
+ self.getOpenTrades()
+
+ for trade in self.trades:
+ if trade.pair != pair:
+ continue
+ filled_buys = trade.select_filled_orders('buy')
+ count = 0
+ amount = 0
+ for buy in filled_buys:
+ if count == 0:
+ dataframe['first_price'] = buy.price
+ self.pairs[pair]['first_buy'] = buy.price
+ self.pairs[pair]['first_amount'] = buy.price * buy.filled
+ # dataframe['close01'] = buy.price * 1.01
+
+ # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
+ # status=closed, date=2024-08-26 02:20:11)
+ dataframe['last_price'] = buy.price
+ self.pairs[pair]['last_buy'] = buy.price
+ count = count + 1
+ amount += buy.price * buy.filled
+ count_buys = count
+ self.pairs[pair]['total_amount'] = amount
+
+ dataframe['absolute_min'] = dataframe['mid'].rolling(1440, min_periods=1).min()
+ dataframe['absolute_max'] = dataframe['mid'].rolling(1440, min_periods=1).max()
+ # steps = (dataframe['absolute_max'] - dataframe['absolute_min']) / (dataframe['absolute_min'] * 0.01)
+ # levels = [dataframe['absolute_min'] * (1 + i / 100) for i in range(1, steps + 1)]
+ #
+ # print(levels)
+
+ ###########################################################
+ # Bollinger Bands
+ bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
+ dataframe['bb_lowerband'] = bollinger['lower']
+ dataframe['bb_middleband'] = bollinger['mid']
+ dataframe['bb_upperband'] = bollinger['upper']
+ dataframe["bb_percent"] = (
+ (dataframe["close"] - dataframe["bb_lowerband"]) /
+ (dataframe["bb_upperband"] - dataframe["bb_lowerband"])
+ )
+ dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma24"]
+
+
+ # Calcul MACD
+ macd, macdsignal, macdhist = talib.MACD(
+ dataframe['close'],
+ fastperiod=12,
+ slowperiod=26,
+ signalperiod=9
+ )
+
+ # | Nom | Formule / définition | Signification |
+ # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+ # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière |
+ # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente |
+ # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse |
+
+ # Ajouter dans le dataframe
+ dataframe['macd'] = macd
+ dataframe['macdsignal'] = macdsignal
+ dataframe['macdhist'] = macdhist
+
+ # Regarde dans le futur
+ # # --- Rendre relatif sur chaque série (-1 → 1) ---
+ # for col in ['macd', 'macdsignal', 'macdhist']:
+ # series = dataframe[col]
+ # valid = series[~np.isnan(series)] # ignorer NaN
+ # min_val = valid.min()
+ # max_val = valid.max()
+ # span = max_val - min_val if max_val != min_val else 1
+ # dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1
+ #
+ # dataframe['tdc_macd'] = self.macd_tendance_int(
+ # dataframe,
+ # macd_col='macd_rel',
+ # signal_col='macdsignal_rel',
+ # hist_col='macdhist_rel'
+ # )
+
+ # ------------------------------------------------------------------------------------
+ # rolling SMA indicators (used for trend detection too)
+ s_short = self.DEFAULT_PARAMS['sma_short']
+ s_long = self.DEFAULT_PARAMS['sma_long']
+
+ dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean()
+ dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean()
+
+ # --- pente brute ---
+ dataframe['slope'] = dataframe['sma24'].diff()
+
+ # --- lissage EMA ---
+ dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean()
+
+ # # RSI
+ # window = 14
+ # delta = dataframe['close'].diff()
+ # up = delta.clip(lower=0)
+ # down = -1 * delta.clip(upper=0)
+ # ma_up = up.rolling(window=window).mean()
+ # ma_down = down.rolling(window=window).mean()
+ # rs = ma_up / ma_down.replace(0, 1e-9)
+ # dataframe['rsi'] = 100 - (100 / (1 + rs))
+ #
+ # # EMA example
+ # dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean()
+ #
+ # # ATR (simple implementation)
+ # high_low = dataframe['high'] - dataframe['low']
+ # high_close = (dataframe['high'] - dataframe['close'].shift()).abs()
+ # low_close = (dataframe['low'] - dataframe['close'].shift()).abs()
+ # tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1)
+ # dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean()
+
+ ###########################
+ # df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume
+ # Assure-toi qu'il est trié par date croissante
+ timeframe = self.timeframe
+ # --- Volatilité normalisée ---
+ dataframe['atr'] = ta.volatility.AverageTrueRange(
+ high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
+ ).average_true_range()
+ dataframe['atr_norm'] = dataframe['atr'] / dataframe['close']
+
+ # --- Force de tendance ---
+ dataframe['adx'] = ta.trend.ADXIndicator(
+ high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
+ ).adx()
+
+ # --- Volume directionnel (On Balance Volume) ---
+ dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(
+ close=dataframe['close'], volume=dataframe['volume']
+ ).on_balance_volume()
+ self.calculeDerivees(dataframe, 'obv', ema_period=1)
+
+ dataframe['obv5'] = ta.volume.OnBalanceVolumeIndicator(
+ close=dataframe['sma5'], volume=dataframe['volume'].rolling(5).sum()
+ ).on_balance_volume()
+ self.calculeDerivees(dataframe, 'obv5', ema_period=5)
+
+ # --- Volatilité récente (écart-type des rendements) ---
+ dataframe['vol_24'] = dataframe['percent'].rolling(24).std()
+
+ # Compter les baisses / hausses consécutives
+ # self.calculateDownAndUp(dataframe, limit=0.0001)
+
+ # df : ton dataframe OHLCV + indicateurs existants
+ # Assurez-vous que les colonnes suivantes existent :
+ # 'max_rsi_12', 'roc_24', 'bb_percent_1h'
+
+ # --- Filtrage des NaN initiaux ---
+ # dataframe = dataframe.dropna()
+
+ dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI
+ dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance
+ dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width']
+
+ dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3)
+ dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9)
+ dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0
+
+ ###########################################################
+ # print(f"min={dataframe['absolute_min'].min()} max={dataframe['absolute_max'].max()}")
+ for i in [0, 1, 2, 3]:
+ dataframe[f"lvl_{i}_pct"] = dataframe['absolute_min'] * (1 + 0.01 * i)
+
+ self.model_indicators = self.listUsableColumns(dataframe)
+
+ if False and self.dp.runmode.value in ('backtest'):
+ self.trainModel(dataframe, metadata)
+
+ short_pair = self.getShortName(pair)
+
+ self.model = joblib.load(f"{short_pair}_rf_model.pkl")
+
+ # Préparer les features pour la prédiction
+ features = dataframe[self.model_indicators].fillna(0)
+
+ # Prédiction : probabilité que le prix monte
+ probs = self.model.predict_proba(features)[:, 1]
+
+ # Sauvegarder la probabilité pour l’analyse
+ dataframe['ml_prob'] = probs
+
+ if False and self.dp.runmode.value in ('backtest'):
+ self.inspect_model(self.model)
+
+ #
+ # absolute_min = dataframe['absolute_min'].min()
+ # absolute_max = dataframe['absolute_max'].max()
+ #
+ # # Écart total
+ # diff = absolute_max - absolute_min
+ #
+ # # Nombre de lignes intermédiaires (1% steps)
+ # steps = int((absolute_max - absolute_min) / (absolute_min * 0.01))
+ #
+ # # Niveaux de prix à 1%, 2%, ..., steps%
+ # levels = [absolute_min * (1 + i / 100) for i in range(1, steps + 1)]
+ # levels = [lvl for lvl in levels if lvl < absolute_max] # évite le dernier niveau exact
+ #
+ # # ajout dans le DataFrame
+ # for i, lvl in enumerate(levels, start=1):
+ # dataframe[f"lvl_{i}_pct"] = lvl
+
+ # # Indices correspondants
+ # indices = [(dataframe['mid'] - lvl).abs().idxmin() for lvl in levels]
+
+ return dataframe
+
+ def getOpenTrades(self):
+ # if len(self.trades) == 0:
+ self.trades = Trade.get_open_trades()
+ return self.trades
+
+ # def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
+ # dataframe.loc[
+ # (
+ # # (dataframe['sma5_inv'] == 1)
+ # (
+ # (dataframe['pct180'] < 0.5) |
+ # (
+ # (dataframe['close'] < dataframe['sma60'] )
+ # & (dataframe['sma24_deriv1'] > 0)
+ # )
+ # )
+ # # & (dataframe['hapercent'] > 0)
+ # # & (dataframe['sma24_deriv1'] > - 0.03)
+ # & (dataframe['ml_prob'] > 0.1)
+ # # & (
+ # # (dataframe['percent3'] <= -0.003)
+ # # | (dataframe['percent12'] <= -0.003)
+ # # | (dataframe['percent24'] <= -0.003)
+ # # )
+ # ), ['enter_long', 'enter_tag']] = (1, f"future")
+ #
+ # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
+ #
+ # return dataframe
+
+ def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
+ """
+ Buy when the model predicts a high upside probability/value.
+ This method loads the ML model, generates predictions, and
+ triggers a buy if the predicted value exceeds a learned threshold.
+ """
+
+ # # Ensure prediction column exists
+ # if "ml_prediction" not in dataframe.columns:
+ # # Generate predictions on the fly
+ # # (your model must already be loaded in self.model)
+ # features = self.ml_features # list of feature column names
+ # dataframe["ml_prediction"] = self.model.predict(dataframe[features].fillna(0))
+
+ # Choose threshold automatically based on training statistics
+ # or a fixed value discovered by SHAP / PDP
+ # threshold = 0.4 #self.buy_threshold # ex: 0.80 or 1.10 depending on your model
+
+ # 20% des signaux les plus forts
+ threshold = np.percentile(dataframe["ml_prob"], 80)
+
+ # Buy = prediction > threshold
+ dataframe["buy"] = 0
+ dataframe.loc[dataframe["ml_prob"] > threshold, ['enter_long', 'enter_tag']] = (1, f"future")
+ dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
+
+ return dataframe
+
+ # def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
+ # """
+ # Populate buy signals based on SHAP/PDP insights:
+ # - strong momentum: macdhist high and macd > macdsignal
+ # - rsi elevated (but not extreme)
+ # - positive sma24 derivative above threshold
+ # - price above sma60 (trend context)
+ # - price in upper region of Bollinger (bb_percent high)
+ # - volume/obv filter and volatility guard (obv_dist, atr)
+ # Returns dataframe with column 'buy' (1 = buy signal).
+ # """
+ #
+ # # Ensure column existence (fallback to zeros if missing)
+ # cols = [
+ # "macdhist", "macd", "macdsignal", "rsi", "rsi_short",
+ # "sma24_deriv1", "sma60", "bb_percent",
+ # "obv_dist", "atr", "percent", "open_1h", "absolute_min"
+ # ]
+ # for c in cols:
+ # if c not in dataframe.columns:
+ # dataframe[c] = 0.0
+ #
+ # # Thresholds (tune these)
+ # TH_MACDHIST = 8.0 # macdhist considered "strong" (example)
+ # TH_MACD_POS = 0.0 # macd must be > 0 (positive momentum)
+ # TH_SMA24_DERIV = 0.05 # sma24 derivative threshold where effect appears
+ # TH_RSI_LOW = 52.0 # lower bound to consider bullish RSI
+ # TH_RSI_HIGH = 85.0 # upper bound to avoid extreme overbought (optional)
+ # TH_BB_PERCENT = 0.7 # in upper band (0..1)
+ # TH_OBV_DIST = -40.0 # accept small negative OBV distance, reject very negative
+ # MAX_ATR = None # optional: maximum ATR to avoid extreme volatility (None = off)
+ # MIN_PRICE_ABOVE_SMA60 = 0.0 # require price > sma60 (price - sma60 > 0)
+ #
+ # price = dataframe["close"]
+ #
+ # # Momentum conditions
+ # cond_macdhist = dataframe["macdhist"] >= TH_MACDHIST
+ # cond_macd_pos = dataframe["macd"] > TH_MACD_POS
+ # cond_macd_vs_signal = dataframe["macd"] > dataframe["macdsignal"]
+ #
+ # # RSI condition (accept moderate-high RSI)
+ # cond_rsi = (dataframe["rsi"] >= TH_RSI_LOW) & (dataframe["rsi"] <= TH_RSI_HIGH)
+ #
+ # # SMA24 derivative: require momentum above threshold
+ # cond_sma24 = dataframe["sma24_deriv1"] >= TH_SMA24_DERIV
+ #
+ # # Price above SMA60 (trend filter)
+ # cond_above_sma60 = (price - dataframe["sma60"]) > MIN_PRICE_ABOVE_SMA60
+ #
+ # # Bollinger band percent (price in upper region)
+ # cond_bb = dataframe["bb_percent"] >= TH_BB_PERCENT
+ #
+ # # Volume/OBV prudence filter
+ # cond_obv = dataframe["obv_dist"] >= TH_OBV_DIST
+ #
+ # # Optional ATR guard
+ # if MAX_ATR is not None:
+ # cond_atr = dataframe["atr"] <= MAX_ATR
+ # else:
+ # cond_atr = np.ones_like(dataframe["atr"], dtype=bool)
+ #
+ # # Optional additional guards (avoid tiny percent moves or weird opens)
+ # cond_percent = np.abs(dataframe["percent"]) > 0.0005 # ignore almost-no-move bars
+ # cond_open = True # keep as placeholder; you can add open_1h relative checks
+ #
+ # # Combine into a buy signal
+ # buy_condition = (
+ # cond_macdhist &
+ # cond_macd_pos &
+ # cond_macd_vs_signal &
+ # cond_rsi &
+ # cond_sma24 &
+ # cond_above_sma60 &
+ # cond_bb &
+ # cond_obv &
+ # cond_atr &
+ # cond_percent
+ # )
+ #
+ # # Finalize: set buy column (0/1)
+ # dataframe.loc[buy_condition, ['enter_long', 'enter_tag']] = (1, f"future")
+ # # dataframe.loc[~buy_condition, "buy"] = 0
+ #
+ # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
+ #
+ # return dataframe
+
+ def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
+
+ return dataframe
+
+ # def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
+ # # Calculer le minimum des 14 derniers jours
+ # nb_pairs = len(self.dp.current_whitelist())
+ #
+ # base_stake_amount = self.config.get('stake_amount')
+ #
+ # if True : #self.pairs[pair]['count_of_buys'] == 0:
+ # factor = 1 #65 / min(65, last_candle['rsi_1d'])
+ # # if last_candle['min_max_60'] > 0.04:
+ # # factor = 2
+ #
+ # adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor)
+ # else:
+ # adjusted_stake_amount = self.pairs[pair]['first_amount']
+ #
+ # if self.pairs[pair]['count_of_buys'] == 0:
+ # self.pairs[pair]['first_amount'] = adjusted_stake_amount
+ #
+ # return adjusted_stake_amount
+
+ def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
+
+ ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle))
+
+ ath_dist = 100 * (ath - last_candle["mid"]) / ath
+
+ # ath_dist
+ # 0 ==> 1
+ # 20 ==> 1.5
+ # 40 ==> 2
+ # 50 * (1 + (ath_dist / 40))
+ base_stake = self.config.get('stake_amount') * (1 + (ath_dist / 40))
+
+ # Calcule max/min 180
+ low180 = last_candle["min180"]
+ high180 = last_candle["max180"]
+
+ mult = 1 - ((last_candle["mid"] - low180) / (high180 - low180))
+
+ print(f"low={low180} mid={last_candle['mid']} high={high180} mult={mult} ath={ath} ath_dist={round(ath_dist, 2)}" )
+ # base_size = montant de base que tu veux utiliser (ex: stake_amount ou autre)
+ base_size = base_stake # exemple fraction du portefeuille; adapte selon ton code
+ # new stake proportionnel à mult
+ new_stake = base_size #* mult
+ return new_stake
+
+ def adjust_trade_position(self, trade: Trade, current_time: datetime,
+ current_rate: float, current_profit: float, min_stake: float,
+ max_stake: float, **kwargs):
+ # ne rien faire si ordre deja en cours
+ if trade.has_open_orders:
+ # self.printLog("skip open orders")
+ return None
+ if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
+ return 0
+
+ dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+ before_last_candle = dataframe.iloc[-2].squeeze()
+ # prépare les données
+ current_time = current_time.astimezone(timezone.utc)
+ open_date = trade.open_date.astimezone(timezone.utc)
+ dispo = round(self.wallets.get_available_stake_amount())
+ hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
+ days_since_first_buy = (current_time - trade.open_date_utc).days
+ hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
+ count_of_buys = trade.nr_of_successful_entries
+ current_time_utc = current_time.astimezone(timezone.utc)
+ open_date = trade.open_date.astimezone(timezone.utc)
+ days_since_open = (current_time_utc - open_date).days
+ pair = trade.pair
+ profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
+ last_lost = self.getLastLost(last_candle, pair)
+ pct_first = 0
+
+ total_counts = sum(
+ pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC')
+
+ if self.pairs[pair]['first_buy']:
+ pct_first = self.getPctFirstBuy(pair, last_candle)
+
+ lim = 0.3
+ if (len(dataframe) < 1):
+ # self.printLog("skip dataframe")
+ return None
+
+ # Dernier prix d'achat réel (pas le prix moyen)
+ last_fill_price = self.pairs[trade.pair]['last_buy'] #trade.open_rate # remplacé juste après ↓
+
+ # if len(trade.orders) > 0:
+ # # On cherche le dernier BUY exécuté
+ # buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"]
+ # if buy_orders:
+ # last_fill_price = buy_orders[-1].price
+
+ # baisse relative
+ dca_threshold = 0.0025 * count_of_buys
+ decline = (last_fill_price - current_rate) / last_fill_price
+ increase = - decline
+
+ # if decline >= self.dca_threshold:
+ # # Exemple : on achète 50% du montant du dernier trade
+ # last_amount = buy_orders[-1].amount if buy_orders else 0
+ # stake_amount = last_amount * current_rate * 0.5
+ # return stake_amount
+
+ condition = last_candle['hapercent'] > 0 and last_candle['sma24_deriv1'] > 0
+ limit_buy = 40
+ # or (last_candle['close'] <= last_candle['min180'] and hours > 3)
+ if (decline >= dca_threshold) and condition:
+ try:
+ if self.pairs[pair]['has_gain'] and profit > 0:
+ self.pairs[pair]['force_sell'] = True
+ self.pairs[pair]['previous_profit'] = profit
+ return None
+
+ max_amount = self.config.get('stake_amount') * 2.5
+ stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
+ self.adjust_stake_amount(pair, last_candle))
+ # print(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}")
+ if stake_amount > 0:
+ self.pairs[pair]['previous_profit'] = profit
+ trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
+ self.pairs[trade.pair]['count_of_buys'] += 1
+ self.pairs[pair]['total_amount'] += stake_amount
+ self.log_trade(
+ last_candle=last_candle,
+ date=current_time,
+ action="🟧 Loss -",
+ dispo=dispo,
+ pair=trade.pair,
+ rate=current_rate,
+ trade_type=trade_type,
+ profit=round(profit, 1),
+ buys=trade.nr_of_successful_entries + 1,
+ stake=round(stake_amount, 2)
+ )
+
+ self.pairs[trade.pair]['last_buy'] = current_rate
+ self.pairs[trade.pair]['max_touch'] = last_candle['close']
+ self.pairs[trade.pair]['last_candle'] = last_candle
+
+ # df = pd.DataFrame.from_dict(self.pairs, orient='index')
+ # colonnes_a_exclure = ['last_candle', 'stop',
+ # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
+ # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
+ # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
+ #
+ # self.printLog(df_filtered)
+
+ return stake_amount
+ return None
+ except Exception as exception:
+ self.printLog(exception)
+ return None
+
+ if current_profit > dca_threshold and (increase >= dca_threshold and self.wallets.get_available_stake_amount() > 0):
+ try:
+ self.pairs[pair]['previous_profit'] = profit
+ stake_amount = max(20, min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle)))
+ if stake_amount > 0:
+ self.pairs[pair]['has_gain'] += 1
+
+ trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
+ self.pairs[trade.pair]['count_of_buys'] += 1
+ self.pairs[pair]['total_amount'] += stake_amount
+ self.log_trade(
+ last_candle=last_candle,
+ date=current_time,
+ action="🟡 Gain +",
+ dispo=dispo,
+ pair=trade.pair,
+ rate=current_rate,
+ trade_type='Gain',
+ profit=round(profit, 1),
+ buys=trade.nr_of_successful_entries + 1,
+ stake=round(stake_amount, 2)
+ )
+ self.pairs[trade.pair]['last_buy'] = current_rate
+ self.pairs[trade.pair]['max_touch'] = last_candle['close']
+ self.pairs[trade.pair]['last_candle'] = last_candle
+ return stake_amount
+ return None
+ except Exception as exception:
+ self.printLog(exception)
+ return None
+
+ return None
+
+ def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
+
+ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+ last_candle_1h = dataframe.iloc[-13].squeeze()
+ before_last_candle = dataframe.iloc[-2].squeeze()
+ before_last_candle_2 = dataframe.iloc[-3].squeeze()
+ before_last_candle_12 = dataframe.iloc[-13].squeeze()
+
+ expected_profit = self.expectedProfit(pair, last_candle)
+ # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
+
+ # ----- 1) Charger les variables de trailing pour ce trade -----
+ max_price = self.pairs[pair]['max_touch']
+
+ self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
+ self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
+ self.pairs[pair]['current_trade'] = trade
+
+ count_of_buys = trade.nr_of_successful_entries
+
+ profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
+
+ if current_profit > 0:
+ self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
+ # else:
+ # self.pairs[pair]['max_profit'] = 0
+
+ max_profit = self.pairs[pair]['max_profit']
+
+ # if current_profit > 0:
+ # print(f"profit={profit} max_profit={max_profit} current_profit={current_profit}")
+
+ baisse = 0
+ if profit > 0:
+ baisse = 1 - (profit / max_profit)
+ mx = max_profit / 5
+ self.pairs[pair]['count_of_buys'] = count_of_buys
+ self.pairs[pair]['current_profit'] = profit
+
+ dispo = round(self.wallets.get_available_stake_amount())
+ hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
+ days_since_first_buy = (current_time - trade.open_date_utc).days
+ hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
+ minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
+
+ # ----- 2) Mise à jour du max_price -----
+ self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
+
+ # ----- 3) Calcul du profit max atteint -----
+ # profit_max = (max_price - trade.open_rate) / trade.open_rate
+
+ current_trailing_stop_positive = self.trailing_stop_positive
+ current_trailing_only_offset_is_reached = self.trailing_only_offset_is_reached
+ current_trailing_stop_positive_offset = self.trailing_stop_positive_offset
+
+ max_ = last_candle['max180']
+ min_ = last_candle['min180']
+ mid = last_candle['mid']
+ # éviter division par zéro
+ position = (mid - min_) / (max_ - min_)
+ zone = int(position * 3) # 0 à 2
+
+ if zone == 0:
+ current_trailing_stop_positive = self.trailing_stop_positive
+ current_trailing_stop_positive_offset = self.trailing_stop_positive_offset * 2
+ if minutes > 1440:
+ current_trailing_only_offset_is_reached = False
+ current_trailing_stop_positive_offset = self.trailing_stop_positive_offset
+ # if zone == 1:
+
+ # ----- 5) Calcul du trailing stop dynamique -----
+ # Exemple : offset=0.321 => stop à +24.8%
+
+ trailing_stop = max_profit * (1.0 - current_trailing_stop_positive)
+ baisse = 0
+ if max_profit:
+ baisse = (max_profit - profit) / max_profit
+
+ if minutes % 12 == 0:
+ self.log_trade(
+ last_candle=last_candle,
+ date=current_time,
+ action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
+ dispo=dispo,
+ pair=pair,
+ rate=last_candle['close'],
+ trade_type=f"{round(profit, 2)} {round(max_profit, 2)} {round(trailing_stop,2)} {minutes}",
+ profit=round(profit, 2),
+ buys=count_of_buys,
+ stake=0
+ )
+
+ if last_candle['ml_prob'] > 0.5:
+ return None
+ # if last_candle['sma24_deriv1'] > 0 : #and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15:
+ # if (minutes < 180):
+ # return None
+ # if (minutes > 1440 and last_candle['sma60_deriv1'] > 0) :
+ # return None
+
+
+ # ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? -----
+ if current_trailing_only_offset_is_reached:
+ # Max profit pas atteint ET perte < 2 * current_trailing_stop_positive
+ if max_profit < min(2, current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain']))\
+ and (max_profit > current_trailing_stop_positive_offset): #2 * current_trailing_stop_positive:
+ return None # ne pas activer le trailing encore
+ # Sinon : trailing actif dès le début
+
+ # ----- 6) Condition de vente -----
+ if 0 < profit <= trailing_stop and last_candle['mid'] < last_candle['sma5']:
+ return f"stop_{count_of_buys}_{self.pairs[pair]['has_gain']}"
+ return None
+
+ def informative_pairs(self):
+ # get access to all pairs available in whitelist.
+ pairs = self.dp.current_whitelist()
+ informative_pairs = [(pair, '1h') for pair in pairs]
+ informative_pairs += [(pair, '1d') for pair in pairs]
+
+ return informative_pairs
+
+ def populate1hIndicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
+
+ # --- WEEKLY LEVELS ---
+ # semaine précédente = semaine ISO différente
+ df["week"] = df.index.isocalendar().week
+ df["year"] = df.index.year
+
+ df["weekly_low"] = (
+ df.groupby(["year", "week"])["low"]
+ .transform("min")
+ .shift(1) # décalé -> pas regarder la semaine en cours
+ )
+ df["weekly_high"] = (
+ df.groupby(["year", "week"])["high"]
+ .transform("max")
+ .shift(1)
+ )
+
+ # Définition simple d'une zone de demande hebdo :
+ # bas + 25% de la bougie => modifiable
+ df["weekly_demand_zone_low"] = df["weekly_low"]
+ df["weekly_demand_zone_high"] = df["weekly_low"] * 1.025
+
+ # --- MONTHLY LEVELS ---
+ df["month"] = df.index.month
+
+ df["monthly_low"] = (
+ df.groupby(["year", "month"])["low"]
+ .transform("min")
+ .shift(1) # mois précédent uniquement
+ )
+ df["monthly_high"] = (
+ df.groupby(["year", "month"])["high"]
+ .transform("max")
+ .shift(1)
+ )
+
+ df["monthly_demand_zone_low"] = df["monthly_low"]
+ df["monthly_demand_zone_high"] = df["monthly_low"] * 1.03
+
+ return df
+
+ # ----- SIGNALS SIMPLES POUR EXEMPLE -----
+
+ # def populate_buy_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
+ # df["buy"] = 0
+ #
+ # # Exemple : acheter si le prix tape la zone de demande hebdomadaire
+ # df.loc[
+ # (df["close"] <= df["weekly_demand_zone_high"]) &
+ # (df["close"] >= df["weekly_demand_zone_low"]),
+ # "buy"
+ # ] = 1
+ #
+ # return df
+ #
+ # def populate_sell_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
+ # df["sell"] = 0
+ #
+ # # Exemple : vendre sur retour au weekly_high précédent
+ # df.loc[df["close"] >= df["weekly_high"], "sell"] = 1
+ #
+ # return df
+
+
+ def rsi_trend_probability(self, dataframe, short=6, long=12):
+ dataframe = dataframe.copy()
+
+ dataframe['rsi_short'] = talib.RSI(dataframe['mid'], short)
+ dataframe['rsi_long'] = talib.RSI(dataframe['mid'], long)
+
+ dataframe['cross_soft'] = np.tanh((dataframe['rsi_short'] - dataframe['rsi_long']) / 7)
+
+ dataframe['gap'] = (dataframe['rsi_short'] - dataframe['rsi_long']) / 100
+ dataframe['trend'] = (dataframe['rsi_long'] - 50) / 50
+
+ dataframe['rtp'] = (
+ 0.6 * dataframe['cross_soft'] +
+ 0.25 * dataframe['gap'] +
+ 0.15 * dataframe['trend']
+ ).clip(-1, 1)
+
+ return dataframe
+
+ import pandas as pd
+
+ def to_utc_ts(self, x):
+ return pd.to_datetime(x, utc=True)
+
+ # suppose self.btc_ath_history exists (liste de dict)
+ def get_last_ath_before_candle(self, last_candle):
+ candle_date = self.to_utc_ts(last_candle['date']) # ou to_utc_ts(last_candle.name)
+ best = None
+ for a in self.btc_ath_history: #getattr(self, "btc_ath_history", []):
+ ath_date = self.to_utc_ts(a["date"])
+ if ath_date <= candle_date:
+ if best is None or ath_date > best[0]:
+ best = (ath_date, a["price_usd"])
+ return best[1] if best is not None else None
+
+ def trainModel(self, dataframe: DataFrame, metadata: dict):
+ pair = self.getShortName(metadata['pair'])
+ pd.set_option('display.max_rows', None)
+ pd.set_option('display.max_columns', None)
+ pd.set_option("display.width", 200)
+ path=f"user_data/plots/{pair}/"
+ os.makedirs(path, exist_ok=True)
+
+ # # Étape 1 : sélectionner numériques
+ # numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
+ #
+ # # Étape 2 : enlever constantes
+ # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
+ # and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d")
+ # and not c.endswith("_class") and not c.endswith("_price")
+ # and not c.startswith('stop_buying'))]
+ #
+ # # Étape 3 : remplacer inf et NaN par 0
+ # dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
+ #
+ # print("Colonnes utilisables pour le modèle :")
+ # print(usable_cols)
+ #
+ # self.model_indicators = usable_cols
+ #
+ df = dataframe[self.model_indicators].copy()
+
+ # Corrélations des colonnes
+ corr = df.corr(numeric_only=True)
+ print("Corrélation des colonnes")
+ print(corr)
+
+ # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
+ # df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int)
+ df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 0).astype(int)
+ df['target'] = df['target'].fillna(0).astype(int)
+
+ # Corrélations triées par importance avec une colonne cible
+ target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False)
+ print("Corrélations triées par importance avec une colonne cible")
+ print(target_corr)
+
+ # Corrélations triées par importance avec une colonne cible
+ corr = df.corr(numeric_only=True)
+ corr_unstacked = (
+ corr.unstack()
+ .reset_index()
+ .rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"})
+ )
+ # Supprimer les doublons col1/col2 inversés et soi-même
+ corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]]
+
+ # Trier par valeur absolue de corrélation
+ corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index)
+ print("Trier par valeur absolue de corrélation")
+ print(corr_sorted.head(20))
+
+ # --- Calcul de la corrélation ---
+ corr = df.corr(numeric_only=True) # évite les colonnes non numériques
+ corr = corr * 100 # passage en pourcentage
+
+ # --- Masque pour n’afficher que le triangle supérieur (optionnel) ---
+ mask = np.triu(np.ones_like(corr, dtype=bool))
+
+ # --- Création de la figure ---
+ fig, ax = plt.subplots(figsize=(96, 36))
+
+ # --- Heatmap avec un effet “température” ---
+ sns.heatmap(
+ corr,
+ mask=mask,
+ cmap="coolwarm", # palette bleu → rouge
+ center=0, # 0 au centre
+ annot=True, # affiche les valeurs dans chaque case
+ fmt=".0f", # format entier (pas de décimale)
+ cbar_kws={"label": "Corrélation (%)"}, # légende à droite
+ linewidths=0.5, # petites lignes entre les cases
+ ax=ax
+ )
+
+ # --- Personnalisation ---
+ ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20)
+ plt.xticks(rotation=45, ha="right")
+ plt.yticks(rotation=0)
+
+ # --- Sauvegarde ---
+ output_path = f"{path}/Matrice_de_correlation_temperature.png"
+ plt.savefig(output_path, bbox_inches="tight", dpi=150)
+ plt.close(fig)
+
+ print(f"✅ Matrice enregistrée : {output_path}")
+
+ # Exemple d'utilisation :
+ selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7)
+ print("===== 🎯 FEATURES SÉLECTIONNÉES =====")
+ print(selected_corr)
+
+ # Nettoyage
+ df = df.dropna()
+
+ X = df[self.model_indicators]
+ y = df['target'] # ta colonne cible binaire ou numérique
+ print("===== 🎯 FEATURES SCORES =====")
+ print(self.feature_auc_scores(X, y))
+
+ # 4️⃣ Split train/test
+ X = df[self.model_indicators]
+ y = df['target']
+ # Séparation temporelle (train = 80 %, valid = 20 %)
+ X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False)
+
+ # Nettoyage des valeurs invalides
+
+ selector = VarianceThreshold(threshold=0.0001)
+ selector.fit(X_train)
+ selected = X_train.columns[selector.get_support()]
+ print("Colonnes conservées :", list(selected))
+
+ # 5️⃣ Entraînement du modèle
+ # self.train_model = RandomForestClassifier(n_estimators=200, random_state=42)
+
+ # def objective(trial):
+ # self.train_model = XGBClassifier(
+ # n_estimators=trial.suggest_int("n_estimators", 200, 300),
+ # max_depth=trial.suggest_int("max_depth", 3, 6),
+ # learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3),
+ # subsample=trial.suggest_float("subsample", 0.7, 1.0),
+ # colsample_bytree=trial.suggest_float("colsample_bytree", 0.7, 1.0),
+ # scale_pos_weight=1, # tu mettras balance_ratio ici si tu veux
+ # objective="binary:logistic",
+ # eval_metric="logloss",
+ # n_jobs=-1
+ # )
+ #
+ # self.train_model.fit(X_train, y_train)
+ #
+ # y_pred = self.train_model.predict(X_valid) # <-- validation = test split
+ # return f1_score(y_valid, y_pred)
+ #
+ # study = optuna.create_study(direction="maximize")
+ # study.optimize(objective, n_trials=50)
+
+ def objective(trial):
+ self.train_model = XGBClassifier(
+ n_estimators=trial.suggest_int("n_estimators", 200, 800),
+ max_depth=trial.suggest_int("max_depth", 3, 10),
+ learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True),
+ subsample=trial.suggest_float("subsample", 0.6, 1.0),
+ colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0),
+ scale_pos_weight=1,
+ objective="binary:logistic",
+ eval_metric="logloss",
+ n_jobs=-1
+ )
+
+ self.train_model.fit(
+ X_train,
+ y_train,
+ eval_set=[(X_valid, y_valid)],
+ # early_stopping_rounds=50,
+ verbose=False
+ )
+
+ proba = self.train_model.predict_proba(X_valid)[:, 1]
+ thresholds = np.linspace(0.1, 0.9, 50)
+ best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds)
+
+ return best_f1
+ study = optuna.create_study(direction="maximize")
+ study.optimize(objective, n_trials=50)
+
+ # ---- après avoir exécuté la study ------
+
+ print("Best value (F1):", study.best_value)
+ print("Best params:", study.best_params)
+
+ best_trial = study.best_trial
+ print("\n=== BEST TRIAL ===")
+ print("Number:", best_trial.number)
+ print("Value:", best_trial.value)
+ print("Params:")
+ for k, v in best_trial.params.items():
+ print(f" - {k}: {v}")
+
+ # All trials summary
+ print("\n=== ALL TRIALS ===")
+ for t in study.trials:
+ print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}")
+
+ # DataFrame of trials
+ df = study.trials_dataframe()
+ print(df.head())
+
+ # Graphs
+ fig = plot_optimization_history(study)
+ fig.write_html(f"{path}/optimization_history.html")
+ fig = plot_param_importances(study)
+ fig.write_html(f"{path}/param_importances.html")
+ fig = plot_slice(study)
+ fig.write_html(f"{path}/slice.html")
+ fig = plot_parallel_coordinate(study)
+ fig.write_html(f"{path}/parallel_coordinates.html")
+
+
+ # 2️⃣ Sélection des features AVANT calibration
+ sfm = SelectFromModel(self.train_model, threshold="median", prefit=True)
+ selected_features = X_train.columns[sfm.get_support()]
+ print(selected_features)
+
+ # 3️⃣ Calibration ensuite (facultative)
+ calibrated = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5)
+ calibrated.fit(X_train[selected_features], y_train)
+ print(calibrated)
+
+ # # # calibration
+ # self.train_model = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5)
+ # # Sélection
+ # sfm = SelectFromModel(self.train_model, threshold="median")
+ # sfm.fit(X_train, y_train)
+ # selected_features = X_train.columns[sfm.get_support()]
+ # print(selected_features)
+
+ # self.train_model.fit(X_train, y_train)
+
+ y_pred = self.train_model.predict(X_valid)
+ y_proba = self.train_model.predict_proba(X_valid)[:, 1]
+ # print(classification_report(y_valid, y_pred))
+ # print(confusion_matrix(y_valid, y_pred))
+ print("\nRapport de classification :\n", classification_report(y_valid, y_pred))
+ print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred))
+
+ # # Importances
+ # importances = pd.DataFrame({
+ # "feature": self.train_model.feature_name_,
+ # "importance": self.train_model.feature_importances_
+ # }).sort_values("importance", ascending=False)
+ # print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
+ # print(importances)
+
+ # Feature importance
+ importances = self.train_model.feature_importances_
+ feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False)
+
+ # Affichage
+ feat_imp.plot(kind='bar', figsize=(12, 6))
+ plt.title("Feature importances")
+ # plt.show()
+ plt.savefig(f"{path}/Feature importances.png", bbox_inches='tight')
+
+ result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10, random_state=42)
+ perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False)
+ perm_imp.plot(kind='bar', figsize=(12, 6))
+ plt.title("Permutation feature importance")
+ # plt.show()
+ plt.savefig(f"{path}/Permutation feature importance.png", bbox_inches='tight')
+
+ # Shap
+ explainer = shap.TreeExplainer(self.train_model)
+ shap_values = explainer.shap_values(X_valid)
+
+ # Résumé global
+ shap.summary_plot(shap_values, X_valid)
+
+ # Force plot pour une observation
+ force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :])
+ shap.save_html(f"{path}/shap_force_plot.html", force_plot)
+
+ PartialDependenceDisplay.from_estimator(self.train_model, X_valid, selected_features, kind='average')
+ plt.figure(figsize=(24, 24))
+ plt.savefig(f"{path}/PartialDependenceDisplay.png", bbox_inches='tight')
+
+ best_f1 = 0
+ best_t = 0.5
+ for t in [0.3, 0.4, 0.5, 0.6, 0.7]:
+ y_pred_thresh = (y_proba > t).astype(int)
+ score = f1_score(y_valid, y_pred_thresh)
+ print(f"Seuil {t:.1f} → F1: {score:.3f}")
+ if score > best_f1:
+ best_f1 = score
+ best_t = t
+
+ print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}")
+
+ # 6️⃣ Évaluer la précision (facultatif)
+ preds = self.train_model.predict(X_valid)
+ acc = accuracy_score(y_valid, preds)
+ print(f"Accuracy: {acc:.3f}")
+
+ # 7️⃣ Sauvegarde du modèle
+ joblib.dump(self.train_model, f"{pair}_rf_model.pkl")
+ print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl")
+
+ # X = dataframe des features (après shift/rolling/indicators)
+ # y = target binaire ou décimale
+ # model = ton modèle entraîné (RandomForestClassifier ou Regressor)
+
+ # # --- 1️⃣ Mutual Information (MI) ---
+ # mi_scores = mutual_info_classif(X.fillna(0), y)
+ # mi_series = pd.Series(mi_scores, index=X.columns, name='MI')
+ #
+ # # --- 2️⃣ Permutation Importance (PI) ---
+ # pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
+ # pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI')
+ #
+ # # --- 3️⃣ Combinaison dans un seul dataframe ---
+ # importance_df = pd.concat([mi_series, pi_series], axis=1)
+ # importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle
+ # print(importance_df)
+ #
+ # importance_df.plot(kind='bar', figsize=(10, 5))
+ # plt.title("Mutual Info vs Permutation Importance")
+ # plt.ylabel("Score")
+ # plt.show()
+
+ self.analyze_model(pair, self.train_model, X_train, X_valid, y_train, y_valid)
+
+ def inspect_model(self, model):
+ """
+ Affiche les informations d'un modèle ML déjà entraîné.
+ Compatible avec scikit-learn, xgboost, lightgbm, catboost...
+ """
+
+ print("===== 🔍 INFORMATIONS DU MODÈLE =====")
+
+ # Type de modèle
+ print(f"Type : {type(model).__name__}")
+ print(f"Module : {model.__class__.__module__}")
+
+ # Hyperparamètres
+ if hasattr(model, "get_params"):
+ params = model.get_params()
+ print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====")
+ for k, v in params.items():
+ print(f"{k}: {v}")
+
+ # Nombre d’estimateurs
+ if hasattr(model, "n_estimators"):
+ print(f"\nNombre d’estimateurs : {model.n_estimators}")
+
+ # Importance des features
+ if hasattr(model, "feature_importances_"):
+ print("\n===== 📊 IMPORTANCE DES FEATURES =====")
+
+ # Correction ici :
+ feature_names = getattr(model, "feature_names_in_", None)
+ if isinstance(feature_names, np.ndarray):
+ feature_names = feature_names.tolist()
+ elif feature_names is None:
+ feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))]
+
+ fi = pd.DataFrame({
+ "feature": feature_names,
+ "importance": model.feature_importances_
+ }).sort_values(by="importance", ascending=False)
+
+ print(fi)
+
+ # Coefficients (modèles linéaires)
+ if hasattr(model, "coef_"):
+ print("\n===== ➗ COEFFICIENTS =====")
+ coef = np.array(model.coef_)
+ if coef.ndim == 1:
+ for i, c in enumerate(coef):
+ print(f"Feature {i}: {c:.6f}")
+ else:
+ print(coef)
+
+ # Intercept
+ if hasattr(model, "intercept_"):
+ print("\nIntercept :", model.intercept_)
+
+ # Classes connues
+ if hasattr(model, "classes_"):
+ print("\n===== 🎯 CLASSES =====")
+ print(model.classes_)
+
+ # Scores internes
+ for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]:
+ if hasattr(model, attr):
+ print(f"\n{attr} = {getattr(model, attr)}")
+
+ # Méthodes disponibles
+ print("\n===== 🧩 MÉTHODES DISPONIBLES =====")
+ methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)]
+ print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else ""))
+
+ print("\n===== ✅ FIN DE L’INSPECTION =====")
+
+ def analyze_model(self, pair, model, X_train, X_valid, y_train, y_valid):
+ """
+ Analyse complète d'un modèle ML supervisé (classification binaire).
+ Affiche performances, importance des features, matrices, seuils, etc.
+ """
+ output_dir = f"user_data/plots/{pair}/"
+ os.makedirs(output_dir, exist_ok=True)
+
+ # ---- Prédictions ----
+ preds = model.predict(X_valid)
+ probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds
+
+ # ---- Performances globales ----
+ print("===== 📊 ÉVALUATION DU MODÈLE =====")
+ print("Colonnes du modèle :", model.feature_names_in_)
+ print("Colonnes X_valid :", list(X_valid.columns))
+ print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}")
+ print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}")
+
+ print("TN (True Negative) / FP (False Positive)")
+ print("FN (False Negative) / TP (True Positive)")
+ print("\nRapport de classification :\n", classification_report(y_valid, preds))
+
+ # | Élément | Valeur | Signification |
+ # | ------------------- | ------ | ----------------------------------------------------------- |
+ # | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas d’achat) |
+ # | FP (False Positive) | 43 | Modèle a prédit 1 alors que c’était 0 (faux signal d’achat) |
+ # | FN (False Negative) | 108 | Modèle a prédit 0 alors que c’était 1 (manqué un achat) |
+ # | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal d’achat) |
+
+ # ---- Matrice de confusion ----
+ cm = confusion_matrix(y_valid, preds)
+ print("Matrice de confusion :\n", cm)
+
+ plt.figure(figsize=(4, 4))
+ plt.imshow(cm, cmap="Blues")
+ plt.title("Matrice de confusion")
+ plt.xlabel("Prédit")
+ plt.ylabel("Réel")
+ for i in range(2):
+ for j in range(2):
+ plt.text(j, i, cm[i, j], ha="center", va="center", color="black")
+ # plt.show()
+ plt.savefig(os.path.join(output_dir, "Matrice de confusion.png"), bbox_inches="tight")
+ plt.close()
+
+ # ---- Importance des features ----
+ if hasattr(model, "feature_importances_"):
+ print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
+ importance = pd.DataFrame({
+ "feature": X_train.columns,
+ "importance": model.feature_importances_
+ }).sort_values(by="importance", ascending=False)
+ print(importance)
+
+ # Crée une figure plus grande
+ fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces
+
+ # Trace le bar plot sur cet axe
+ importance.plot.bar(x="feature", y="importance", legend=False, ax=ax)
+
+ # Tourner les labels pour plus de lisibilité
+ ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right')
+
+ plt.title("Importance des features")
+ # plt.show()
+ plt.savefig(os.path.join(output_dir, "Importance des features.png"), bbox_inches="tight")
+ plt.close()
+
+ # ---- Arbre de décision (extrait) ----
+ if hasattr(model, "estimators_"):
+ print("\n===== 🌳 EXTRAIT D’UN ARBRE =====")
+ print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800])
+
+ # ---- Précision selon le seuil ----
+ thresholds = np.linspace(0.1, 0.9, 9)
+ print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====")
+ for t in thresholds:
+ preds_t = (probs > t).astype(int)
+ acc = accuracy_score(y_valid, preds_t)
+ print(f"Seuil {t:.1f} → précision {acc:.3f}")
+
+ # ---- ROC Curve ----
+ fpr, tpr, _ = roc_curve(y_valid, probs)
+ plt.figure(figsize=(5, 4))
+ plt.plot(fpr, tpr, label="ROC curve")
+ plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
+ plt.xlabel("Taux de faux positifs")
+ plt.ylabel("Taux de vrais positifs")
+ plt.title("Courbe ROC")
+ plt.legend()
+ # plt.show()
+ plt.savefig(os.path.join(output_dir, "Courbe ROC.png"), bbox_inches="tight")
+ plt.close()
+
+ # # ---- Interprétation SHAP (optionnelle) ----
+ # try:
+ # import shap
+ #
+ # print("\n===== 💡 ANALYSE SHAP =====")
+ # explainer = shap.TreeExplainer(model)
+ # shap_values = explainer.shap_values(X_valid)
+ # # shap.summary_plot(shap_values[1], X_valid)
+ # # Vérifie le type de sortie de shap_values
+ # if isinstance(shap_values, list):
+ # # Cas des modèles de classification (plusieurs classes)
+ # shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
+ # else:
+ # shap_values_to_plot = shap_values
+ #
+ # # Ajustement des dimensions au besoin
+ # if shap_values_to_plot.shape[1] != X_valid.shape[1]:
+ # print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})")
+ # min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1])
+ # shap_values_to_plot = shap_values_to_plot[:, :min_dim]
+ # X_to_plot = X_valid.iloc[:, :min_dim]
+ # else:
+ # X_to_plot = X_valid
+ #
+ # plt.figure(figsize=(12, 4))
+ # shap.summary_plot(shap_values_to_plot, X_to_plot, show=False)
+ # plt.savefig(os.path.join(output_dir, "shap_summary.png"), bbox_inches="tight")
+ # plt.close()
+ # except ImportError:
+ # print("\n(SHAP non installé — `pip install shap` pour activer l’analyse SHAP.)")
+
+ y_proba = model.predict_proba(X_valid)[:, 1]
+
+ # Trace ou enregistre le graphique
+ self.plot_threshold_analysis(y_valid, y_proba, step=0.05,
+ save_path=f"{output_dir}/threshold_analysis.png")
+
+ # y_valid : vraies classes (0 / 1)
+ # y_proba : probabilités de la classe 1 prédites par ton modèle
+ # Exemple : y_proba = model.predict_proba(X_valid)[:, 1]
+
+ seuils = np.arange(0.0, 1.01, 0.05)
+ precisions, recalls, f1s = [], [], []
+
+ for seuil in seuils:
+ y_pred = (y_proba >= seuil).astype(int)
+ precisions.append(precision_score(y_valid, y_pred))
+ recalls.append(recall_score(y_valid, y_pred))
+ f1s.append(f1_score(y_valid, y_pred))
+
+ plt.figure(figsize=(10, 6))
+ plt.plot(seuils, precisions, label='Précision', marker='o')
+ plt.plot(seuils, recalls, label='Rappel', marker='o')
+ plt.plot(seuils, f1s, label='F1-score', marker='o')
+
+ # Ajoute un point pour le meilleur F1
+ best_idx = np.argmax(f1s)
+ plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f'Max F1 ({seuils[best_idx]:.2f})')
+
+ plt.title("Performance du modèle selon le seuil de probabilité")
+ plt.xlabel("Seuil de probabilité (classe 1)")
+ plt.ylabel("Score")
+ plt.grid(True, alpha=0.3)
+ plt.legend()
+ plt.savefig(f"{output_dir}/seuil_de_probabilite.png", bbox_inches='tight')
+ # plt.show()
+
+ print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}")
+
+ print("\n===== ✅ FIN DE L’ANALYSE =====")
+
+ def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None):
+ """
+ Affiche la précision, le rappel et le F1-score selon le seuil de décision.
+ y_true : labels réels (0 ou 1)
+ y_proba : probabilités prédites (P(hausse))
+ step : pas entre les seuils testés
+ save_path : si renseigné, enregistre l'image au lieu d'afficher
+ """
+
+ # Le graphique généré affichera trois courbes :
+ #
+ # 🔵 Precision — la fiabilité de tes signaux haussiers.
+ #
+ # 🟢 Recall — la proportion de hausses que ton modèle détecte.
+ #
+ # 🟣 F1-score — le compromis optimal entre les deux.
+
+ thresholds = np.arange(0, 1.01, step)
+ precisions, recalls, f1s = [], [], []
+
+ for thr in thresholds:
+ preds = (y_proba >= thr).astype(int)
+ precisions.append(precision_score(y_true, preds))
+ recalls.append(recall_score(y_true, preds))
+ f1s.append(f1_score(y_true, preds))
+
+ plt.figure(figsize=(10, 6))
+ plt.plot(thresholds, precisions, label="Precision", linewidth=2)
+ plt.plot(thresholds, recalls, label="Recall", linewidth=2)
+ plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--")
+ plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5")
+ plt.title("📊 Performance selon le seuil de probabilité", fontsize=14)
+ plt.xlabel("Seuil de décision (threshold)")
+ plt.ylabel("Score")
+ plt.legend()
+ plt.grid(True, alpha=0.3)
+
+ if save_path:
+ plt.savefig(save_path, bbox_inches='tight')
+ print(f"✅ Graphique enregistré : {save_path}")
+ else:
+ plt.show()
+
+ def feature_auc_scores(self, X, y):
+ aucs = {}
+ for col in X.columns:
+ try:
+ aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0))
+ except Exception:
+ aucs[col] = np.nan
+ return pd.Series(aucs).sort_values(ascending=False)
+
+ def listUsableColumns(self, dataframe):
+ # Étape 1 : sélectionner numériques
+ numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
+ # Étape 2 : enlever constantes
+ usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
+ and not c.endswith("_state")
+ and not c.endswith("_1d")
+ # and not c.endswith("_1h")
+ # and not c.startswith("open") and not c.startswith("close")
+ # and not c.startswith("low") and not c.startswith("high")
+ # and not c.startswith("haopen") and not c.startswith("haclose")
+ # and not c.startswith("bb_lower") and not c.startswith("bb_upper")
+ # and not c.startswith("bb_middle")
+ and not c.endswith("_count")
+ and not c.endswith("_class") and not c.endswith("_price")
+ and not c.startswith('stop_buying')
+ and not c.startswith('lvl')
+ ]
+ # Étape 3 : remplacer inf et NaN par 0
+ dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
+ print("Colonnes utilisables pour le modèle :")
+ print(usable_cols)
+ self.model_indicators = usable_cols
+ return self.model_indicators
+
+
+ def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7):
+ """
+ Sélectionne les features les plus corrélées avec target,
+ tout en supprimant celles trop corrélées entre elles.
+ """
+ # 1️⃣ Calcul des corrélations absolues avec la cible
+ corr = df.corr(numeric_only=True)
+ corr_target = corr[target].abs().sort_values(ascending=False)
+
+ # 2️⃣ Prend les N features les plus corrélées avec la cible (hors target)
+ features = corr_target.drop(target).head(top_n).index.tolist()
+
+ # 3️⃣ Évite les features trop corrélées entre elles
+ selected = []
+ for feat in features:
+ too_correlated = False
+ for sel in selected:
+ if abs(corr.loc[feat, sel]) > corr_threshold:
+ too_correlated = True
+ break
+ if not too_correlated:
+ selected.append(feat)
+
+ # 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation
+ selected_corr = pd.DataFrame({
+ "feature": selected,
+ "corr_with_target": [corr.loc[f, target] for f in selected]
+ }).sort_values(by="corr_with_target", key=np.abs, ascending=False)
+
+ return selected_corr
+
+ def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'):
+ dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean()
+ dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", ema_period=window)
+ return dataframe
+
+ def calculeDerivees(
+ self,
+ dataframe: pd.DataFrame,
+ name: str,
+ suffixe: str = '',
+ window: int = 100,
+ coef: float = 0.15,
+ ema_period: int = 10,
+ verbose: bool = True,
+ ) -> pd.DataFrame:
+ """
+ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
+ avec epsilon adaptatif basé sur rolling percentiles.
+ """
+
+ d1_col = f"{name}{suffixe}_deriv1"
+ d2_col = f"{name}{suffixe}_deriv2"
+ # d1s_col = f"{name}{suffixe}_deriv1_smooth"
+ # d2s_col = f"{name}{suffixe}_deriv2_smooth"
+ tendency_col = f"{name}{suffixe}_state"
+
+ factor1 = 100 * (ema_period / 5)
+ factor2 = 10 * (ema_period / 5)
+
+ dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \
+ & (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
+ # --- Distance à la moyenne mobile ---
+ dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"]
+
+
+ # dérivée relative simple
+ dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
+ # lissage EMA
+ dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean()
+
+ # dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median()
+
+ dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1)
+ dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean()
+
+ # epsilon adaptatif via rolling percentile
+ p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05)
+ p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95)
+ p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05)
+ p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95)
+
+ eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef
+ eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef
+
+ # fallback global eps
+ global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef
+ global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef
+
+ eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1)
+ eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2)
+
+ # if verbose and self.dp.runmode.value in ('backtest'):
+ # stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T
+ # stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0)
+ # print(f"---- Derivatives stats {timeframe}----")
+ # print(stats)
+ # print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}")
+ # print("---------------------------")
+
+ # mapping tendency
+ def tag_by_derivatives(row):
+ idx = int(row.name)
+ d1v = float(row[d1_col])
+ d2v = float(row[d2_col])
+ eps1 = float(eps_d1_series.iloc[idx])
+ eps2 = float(eps_d2_series.iloc[idx])
+
+ # # mapping état → codes 3 lettres explicites
+ # # | Ancien état | Nouveau code 3 lettres | Interprétation |
+ # # | ----------- | ---------------------- | --------------------- |
+ # # | 4 | HAU | Hausse Accélérée |
+ # # | 3 | HSR | Hausse Ralentissement |
+ # # | 2 | HST | Hausse Stable |
+ # # | 1 | DHB | Départ Hausse |
+ # # | 0 | PAL | Palier / neutre |
+ # # | -1 | DBD | Départ Baisse |
+ # # | -2 | BSR | Baisse Ralentissement |
+ # # | -3 | BST | Baisse Stable |
+ # # | -4 | BAS | Baisse Accélérée |
+
+ # Palier strict
+ if abs(d1v) <= eps1 and abs(d2v) <= eps2:
+ return 0
+ # Départ si d1 ~ 0 mais d2 signale direction
+ if abs(d1v) <= eps1:
+ return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0
+ # Hausse
+ if d1v > eps1:
+ return 4 if d2v > eps2 else 3
+ # Baisse
+ if d1v < -eps1:
+ return -4 if d2v < -eps2 else -2
+ return 0
+
+ dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1)
+
+ # if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'):
+ # print("##################")
+ # print(f"# STAT {timeframe} {name}{suffixe}")
+ # print("##################")
+ # self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2")
+
+ return dataframe