Files
Freqtrade/FrictradeLearning.py

2576 lines
114 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
import inspect
import logging
import os
from datetime import datetime
from datetime import timezone
from typing import Optional
import freqtrade.vendor.qtpylib.indicators as qtpylib
# Machine Learning
import joblib
import matplotlib.pyplot as plt
import mpmath as mp
import numpy as np
import optuna
import pandas as pd
import seaborn as sns
import shap
# Add your lib to import here test git
import ta
import talib.abstract as talib
from freqtrade.persistence import Trade
from freqtrade.strategy import (CategoricalParameter, DecimalParameter, IntParameter, IStrategy, merge_informative_pair)
from optuna.visualization import plot_optimization_history
from optuna.visualization import plot_parallel_coordinate
from optuna.visualization import plot_param_importances
from optuna.visualization import plot_slice
from pandas import DataFrame
from sklearn.calibration import CalibratedClassifierCV
from sklearn.feature_selection import SelectFromModel
from sklearn.feature_selection import VarianceThreshold
from sklearn.inspection import PartialDependenceDisplay
from sklearn.inspection import permutation_importance
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import brier_score_loss, roc_auc_score
from sklearn.metrics import (
classification_report,
confusion_matrix,
accuracy_score,
roc_curve,
precision_score, recall_score
)
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.tree import export_text
from xgboost import XGBClassifier
# --------------------------------
logger = logging.getLogger(__name__)
# Couleurs ANSI de base
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
RESET = "\033[0m"
class FrictradeLearning(IStrategy):
startup_candle_count = 200
train_model = None
model_indicators = []
DEFAULT_PARAMS = {
"rsi_buy": 30,
"rsi_sell": 70,
"ema_period": 21,
"sma_short": 20,
"sma_long": 100,
"atr_period": 14,
"atr_multiplier": 1.5,
"stake_amount": None, # use exchange default
"stoploss": -0.10,
"minimal_roi": {"0": 0.10}
}
indicators = {'sma24_deriv1', 'sma60_deriv1', 'sma5_deriv1_1h', 'sma12_deriv1_1h', 'sma24_deriv1_1h',
'sma60_deriv1_1h'}
indic_1h_force_buy = CategoricalParameter(indicators, default="sma60_deriv1", space='buy')
allow_decrease_rate = DecimalParameter(0.1, 0.8, decimals=1, default=0.4, space='protection', optimize=False,
load=False)
first_adjust_param = DecimalParameter(0.001, 0.01, decimals=3, default=0.005, space='protection', optimize=False,
load=False)
max_steps = IntParameter(10, 50, default=40, space='protection', optimize=True, load=True)
hours_force = IntParameter(1, 48, default=24, space='buy', optimize=True, load=True)
offset_min = IntParameter(1, 48, default=24, space='sell', optimize=True, load=True)
offset_max = IntParameter(1, 48, default=24, space='sell', optimize=True, load=True)
# ROI table:
minimal_roi = {
"0": 10
}
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = False
trailing_stop = False
trailing_stop_positive = 0.15
trailing_stop_positive_offset = 1
trailing_only_offset_is_reached = True
# Buy hypers
timeframe = '1m'
parameters = {}
# DCA config
position_adjustment_enable = True
columns_logged = False
pairs = {
pair: {
"first_buy": 0,
"last_buy": 0.0,
"last_min": 999999999999999.5,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
'previous_profit': 0,
"last_candle": {},
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'first_amount': 0,
'first_price': 0,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
'force_buy': False,
'last_ath': 0,
'mises': {},
'dca_thresholds': {}
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
trades = list()
max_profit_pairs = {}
btc_ath_history = [
{"date": "2011-06-09", "price_usd": 26.15, "note": "pic 2011 (early breakout)"},
{"date": "2013-11-29", "price_usd": 1132.00, "note": "bull run fin 2013"},
{"date": "2017-12-17", "price_usd": 19783.00, "note": "ATH décembre 2017 (crypto bubble)"},
{"date": "2020-12-31", "price_usd": 29001.72, "note": "fin 2020, nouveau record après accumulation)"},
{"date": "2021-11-10", "price_usd": 68742.00, "note": "record novembre 2021 (institutional demand)"},
{"date": "2024-03-05", "price_usd": 69000.00,
"note": "nouveau pic début 2024 (source presse, valeur indicative)"},
{"date": "2024-03-14", "price_usd": 73816.00,
"note": "nouveau pic début 2024 (source presse, valeur indicative)"},
{"date": "2024-11-12", "price_usd": 90000.00, "note": ""},
{"date": "2024-12-17", "price_usd": 108363.00, "note": ""},
{"date": "2025-07-11", "price_usd": 118755.00, "note": "pic juillet 2025 (valeur rapportée par la presse)"},
{"date": "2025-08-13", "price_usd": 123748.00, "note": ""},
{"date": "2025-10-06", "price_usd": 126198.07,
"note": "pic oct. 2025 (source agrégée, à vérifier selon l'exchange)"}
]
def dynamic_trailing_offset(self, price, ath, nb_entries, max_dca=5):
dd_ath = (ath - price) / ath
dd_ath = max(0.0, min(dd_ath, 0.5))
dca_risk = min(nb_entries / max_dca, 1.0)
breathing_score = 0.7 * dd_ath + 0.3 * (1 - dca_risk)
breathing_score = min(max(breathing_score, 0.0), 1.0)
OFFSET_MIN = self.offset_min.value
OFFSET_MAX = self.offset_min.value + self.offset_max.value
return OFFSET_MIN + breathing_score * (OFFSET_MAX - OFFSET_MIN)
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
condition = True # (last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
allow_to_buy = True # (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
if allow_to_buy:
self.trades = list()
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
dispo = round(self.wallets.get_available_stake_amount())
self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.pairs[pair]['total_amount'] = stake_amount
self.pairs[pair]['first_amount'] = stake_amount
self.calculateStepsDcaThresholds(last_candle, pair)
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def progressive_parts(self, total, n, first):
# print('In part')
# conditions impossibles → on évite le solveur
if total <= 0 or first <= 0 or n <= 1:
return [0] * n
f = lambda r: first * (r ** n - 1) / (r - 1) - total
try:
r = mp.findroot(f, 1.2) # 1.2 = plus stable que 1.05
except Exception:
# fallback en cas d'échec
return [first] * n
parts = [round(first * (r ** k), 4) for k in range(n)]
return parts
def calculateStepsDcaThresholds(self, last_candle, pair):
# def split_ratio_one_third(n, p):
# a = n / (2 * p) # première valeur
# d = n / (p * (p - 1)) # incrément
# return [round(a + i * d, 3) for i in range(p)]
# r, parts = progressive_parts(0.4, 40, 0.004)
# print("r =", r)
# print(parts)
val = self.pairs[pair]['first_price'] if self.pairs[pair]['first_price'] > 0 else last_candle['mid']
if self.pairs[pair]['last_ath'] == 0:
ath = max(val, self.get_last_ath_before_candle(last_candle['date']))
self.pairs[pair]['last_ath'] = ath
ath = self.pairs[pair]['last_ath']
steps = self.calculateNumberOfSteps(val, ath, max_steps=self.max_steps.value)
self.pairs[pair]['dca_thresholds'] = self.progressive_parts(
(val - (ath * (1 - self.allow_decrease_rate.value))) / val,
steps, self.first_adjust_param.value)
print(f"val={val} lim={self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate.value)}"
f" steps={steps}"
f" pct={round((val - (self.pairs[pair]['last_ath'] * (1 - self.allow_decrease_rate.value))) / val, 4)}")
print(self.pairs[pair]['dca_thresholds'])
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0))
profit = trade.calc_profit(rate)
force = self.pairs[pair]['force_sell']
# and (last_candle['hapercent'] < 0 )
allow_to_sell = True # (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss')
if allow_to_sell:
self.trades = list()
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['previous_profit'] = 0
self.trades = list()
dispo = round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟥Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(profit, 2)
)
self.pairs[pair]['first_amount'] = 0
self.pairs[pair]['max_profit'] = 0
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['has_gain'] = 0
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['total_amount'] = 0
self.pairs[pair]['count_of_buys'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['current_trade'] = None
else:
self.printLog(
f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked")
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss')
# def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
#
# dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
# last_candle = dataframe.iloc[-1].squeeze()
# last_candle_1h = dataframe.iloc[-13].squeeze()
# before_last_candle = dataframe.iloc[-2].squeeze()
# before_last_candle_2 = dataframe.iloc[-3].squeeze()
# before_last_candle_12 = dataframe.iloc[-13].squeeze()
#
# expected_profit = self.expectedProfit(pair, last_candle)
# # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
#
# max_touch_before = self.pairs[pair]['max_touch']
# self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
# self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
# self.pairs[pair]['current_trade'] = trade
#
# count_of_buys = trade.nr_of_successful_entries
#
# profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
# self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
# max_profit = last_candle['max5'] #self.pairs[pair]['max_profit']
# baisse = 0
# if profit > 0:
# baisse = 1 - (profit / max_profit)
# mx = max_profit / 5
# self.pairs[pair]['count_of_buys'] = count_of_buys
# self.pairs[pair]['current_profit'] = profit
#
# dispo = round(self.wallets.get_available_stake_amount())
# hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
# days_since_first_buy = (current_time - trade.open_date_utc).days
# hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
# minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
#
# if minutes % 4 == 0:
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=round(profit, 2),
# buys=count_of_buys,
# stake=0
# )
#
# if (last_candle['close'] > last_candle['mid']) or (last_candle['sma5_deriv1'] > 0):
# return None
#
# pair_name = self.getShortName(pair)
#
# if profit > 0.003 * count_of_buys and baisse > 0.30:
# self.pairs[pair]['force_sell'] = False
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
# return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
#
# self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
def getShortName(self, pair):
return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '')
def getLastLost(self, last_candle, pair):
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
return last_lost
def getPctFirstBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
def getPctLastBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
def expectedProfit(self, pair: str, last_candle: DataFrame):
lim = 0.01
pct = 0.002
if (self.getShortName(pair) == 'BTC'):
lim = 0.005
pct = 0.001
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
expected_profit = lim * self.pairs[pair][
'total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
self.pairs[pair]['expected_profit'] = expected_profit
return expected_profit
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return
if self.columns_logged % 10 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_min':>7}|{'Buys':>5}| {'Stake':>5} |"
f"{'rsi':>6}|{'rsi_1h':>6}|{'rsi_1d':>6}|{'cf_1h':>6}|{'cf_1d':>6}"
# |Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
)
self.printLineLog()
df = pd.DataFrame.from_dict(self.pairs, orient='index')
colonnes_a_exclure = ['last_candle',
'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
self.printLog(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
rsi = ''
rsi_pct = ''
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = self.getLastLost(last_candle, pair)
if buys is None:
buys = ''
max_touch = ''
pct_max = self.getPctFirstBuy(pair, last_candle)
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
dist_max = ''
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
self.pairs[pair]['last_max'], 3)
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
self.pairs[pair]['last_min'], 3)
color = GREEN if profit > 0 else RED
profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
self.printLog(
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
f"{round(last_candle['max_rsi_24'], 1) or '-':>6}|{round(last_candle['rsi_1h'], 1) or '-':>6}|{round(last_candle['rsi_1d'], 1) or '-':>6}|"
# f"{round(last_candle['rtp_1h'] * 100, 0) or '-' :>6}|{round(last_candle['rtp_1d'] * 100, 0) or '-' :>6}|"
# f"{round(last_candle['confidence_index_1d'], 3) or '-':>6}|{round(last_candle['confidence_index_1h'], 3) or '-':>6}|"
)
def printLineLog(self):
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|"
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
)
def printLog(self, str):
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return;
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
short_pair = self.getShortName(pair)
self.path = f"user_data/strategies/plots/{short_pair}/" # + ("valide/" if not self.dp.runmode.value in ('backtest') else '')
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2
dataframe['sma5'] = dataframe['mid'].ewm(span=5,
adjust=False).mean() # dataframe["mid"].rolling(window=5).mean()
dataframe['sma5_deriv1'] = 1000 * (dataframe['sma5'] - dataframe['sma5'].shift(1)) / dataframe['sma5'].shift(1)
dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean()
dataframe['sma12_deriv1'] = 1000 * (dataframe['sma12'] - dataframe['sma12'].shift(1)) / dataframe[
'sma12'].shift(1)
dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean()
dataframe['sma24_deriv1'] = 1000 * (dataframe['sma24'] - dataframe['sma24'].shift(1)) / dataframe[
'sma24'].shift(1)
dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean()
dataframe['sma60_deriv1'] = 1000 * (dataframe['sma60'] - dataframe['sma60'].shift(1)) / dataframe[
'sma60'].shift(1)
# dataframe[f"sma5_inv"] = (dataframe[f"sma5"].shift(2) >= dataframe[f"sma5"].shift(1)) \
# & (dataframe[f"sma5"].shift(1) <= dataframe[f"sma5"])
dataframe["sma5_sqrt"] = (
np.sqrt(np.abs(dataframe["sma5"] - dataframe["sma5"].shift(1)))
+ np.sqrt(np.abs(dataframe["sma5"].shift(3) - dataframe["sma5"].shift(1)))
)
dataframe["sma5_inv"] = (
(dataframe["sma5"].shift(2) >= dataframe["sma5"].shift(1))
& (dataframe["sma5"].shift(1) <= dataframe["sma5"])
& (dataframe["sma5_sqrt"] > 5)
)
dataframe["sma12_sqrt"] = (
np.sqrt(np.abs(dataframe["sma12"] - dataframe["sma12"].shift(1)))
+ np.sqrt(np.abs(dataframe["sma12"].shift(3) - dataframe["sma12"].shift(1)))
)
dataframe["sma12_inv"] = (
(dataframe["sma12"].shift(2) >= dataframe["sma12"].shift(1))
& (dataframe["sma12"].shift(1) <= dataframe["sma12"])
& (dataframe["sma12_sqrt"] > 5)
)
dataframe["percent"] = dataframe['mid'].pct_change()
dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean()
dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean()
dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean()
dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14)
self.calculeDerivees(dataframe, 'rsi', ema_period=12)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5)
dataframe['min180'] = talib.MIN(dataframe['mid'], timeperiod=180)
dataframe['max180'] = talib.MAX(dataframe['mid'], timeperiod=180)
dataframe['pct180'] = ((dataframe["mid"] - dataframe['min180']) / (dataframe['max180'] - dataframe['min180']))
dataframe = self.rsi_trend_probability(dataframe, short=60, long=360)
# ################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
# Calcul MACD
macd, macdsignal, macdhist = talib.MACD(
informative['close'],
fastperiod=12,
slowperiod=26,
signalperiod=9
)
informative['macd'] = macd
informative['macdsignal'] = macdsignal
informative['macdhist'] = macdhist
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14)
informative['sma5'] = informative['mid'].ewm(span=5, adjust=False).mean()
informative['sma5_deriv1'] = 1000 * (informative['sma5'] - informative['sma5'].shift(1)) / informative[
'sma5'].shift(1)
informative['sma12'] = informative['mid'].ewm(span=12, adjust=False).mean()
informative['sma12_deriv1'] = 1000 * (informative['sma12'] - informative['sma12'].shift(1)) / informative[
'sma12'].shift(1)
informative['sma24'] = informative['mid'].ewm(span=24, adjust=False).mean()
informative['sma24_deriv1'] = 1000 * (informative['sma24'] - informative['sma24'].shift(1)) / informative[
'sma24'].shift(1)
informative['sma60'] = informative['mid'].ewm(span=60, adjust=False).mean()
informative['sma60_deriv1'] = 1000 * (informative['sma60'] - informative['sma60'].shift(1)) / informative[
'sma60'].shift(1)
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14)
self.calculeDerivees(informative, 'rsi', ema_period=12)
# informative = self.rsi_trend_probability(informative)
# probas = self.calculModelInformative(informative)
# self.calculateConfiance(informative)
# informative = self.populate1hIndicators(df=informative, metadata=metadata)
# informative = self.calculateRegression(informative, 'mid', lookback=15)
dataframe = merge_informative_pair(dataframe, informative, '1m', '1h', ffill=True)
# ################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1d')
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=5)
# informative = self.rsi_trend_probability(informative)
# informative = self.calculateRegression(informative, 'mid', lookback=15)
# self.calculateConfiance(informative)
dataframe = merge_informative_pair(dataframe, informative, '1m', '1d', ffill=True)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
filled_buys = trade.select_filled_orders('buy')
count = 0
amount = 0
min_price = 111111111111110
max_price = 0
for buy in filled_buys:
if count == 0:
min_price = min(min_price, buy.price)
max_price = max(max_price, buy.price)
dataframe['first_price'] = buy.price
self.pairs[pair]['first_buy'] = buy.price
self.pairs[pair]['first_amount'] = buy.price * buy.filled
# dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
self.pairs[pair]['last_buy'] = buy.price
count = count + 1
amount += buy.price * buy.filled
self.pairs[pair]['count_of_buys'] = count
self.pairs[pair]['total_amount'] = amount
dataframe['absolute_min'] = dataframe['mid'].rolling(1440, min_periods=1).min()
dataframe['absolute_max'] = dataframe['mid'].rolling(1440, min_periods=1).max()
# steps = (dataframe['absolute_max'] - dataframe['absolute_min']) / (dataframe['absolute_min'] * 0.01)
# levels = [dataframe['absolute_min'] * (1 + i / 100) for i in range(1, steps + 1)]
#
# print(levels)
###########################################################
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma24"]
# Calcul MACD
macd, macdsignal, macdhist = talib.MACD(
dataframe['close'],
fastperiod=12,
slowperiod=26,
signalperiod=9
)
# | Nom | Formule / définition | Signification |
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre lécart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal dachat <br> - Croisement du MACD en dessous → signal de vente |
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et laccélération** de la tendance. <br> - Positif et croissant → tendance haussière qui saccélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui saccélère <br> - Négatif mais croissant → ralentissement de la baisse |
# Ajouter dans le dataframe
dataframe['macd'] = macd
dataframe['macdsignal'] = macdsignal
dataframe['macdhist'] = macdhist
# Regarde dans le futur
# # --- Rendre relatif sur chaque série (-1 → 1) ---
# for col in ['macd', 'macdsignal', 'macdhist']:
# series = dataframe[col]
# valid = series[~np.isnan(series)] # ignorer NaN
# min_val = valid.min()
# max_val = valid.max()
# span = max_val - min_val if max_val != min_val else 1
# dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1
#
# dataframe['tdc_macd'] = self.macd_tendance_int(
# dataframe,
# macd_col='macd_rel',
# signal_col='macdsignal_rel',
# hist_col='macdhist_rel'
# )
# ------------------------------------------------------------------------------------
# rolling SMA indicators (used for trend detection too)
s_short = self.DEFAULT_PARAMS['sma_short']
s_long = self.DEFAULT_PARAMS['sma_long']
dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean()
dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean()
# --- pente brute ---
dataframe['slope'] = dataframe['sma24'].diff()
# --- lissage EMA ---
dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean()
# # RSI
# window = 14
# delta = dataframe['close'].diff()
# up = delta.clip(lower=0)
# down = -1 * delta.clip(upper=0)
# ma_up = up.rolling(window=window).mean()
# ma_down = down.rolling(window=window).mean()
# rs = ma_up / ma_down.replace(0, 1e-9)
# dataframe['rsi'] = 100 - (100 / (1 + rs))
#
# # EMA example
# dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean()
#
# # ATR (simple implementation)
# high_low = dataframe['high'] - dataframe['low']
# high_close = (dataframe['high'] - dataframe['close'].shift()).abs()
# low_close = (dataframe['low'] - dataframe['close'].shift()).abs()
# tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1)
# dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean()
###########################
# df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume
# Assure-toi qu'il est trié par date croissante
timeframe = self.timeframe
# --- Volatilité normalisée ---
dataframe['atr'] = ta.volatility.AverageTrueRange(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).average_true_range()
dataframe['atr_norm'] = dataframe['atr'] / dataframe['close']
# --- Force de tendance ---
dataframe['adx'] = ta.trend.ADXIndicator(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).adx()
# --- Volume directionnel (On Balance Volume) ---
dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(
close=dataframe['close'], volume=dataframe['volume']
).on_balance_volume()
self.calculeDerivees(dataframe, 'obv', ema_period=1)
dataframe['obv12'] = ta.volume.OnBalanceVolumeIndicator(
close=dataframe['sma12'], volume=dataframe['volume'].rolling(12).sum()
).on_balance_volume()
dataframe['obv24'] = ta.volume.OnBalanceVolumeIndicator(
close=dataframe['sma24'], volume=dataframe['volume'].rolling(24).sum()
).on_balance_volume()
# self.calculeDerivees(dataframe, 'obv5', ema_period=5)
# --- Volatilité récente (écart-type des rendements) ---
dataframe['vol_24'] = dataframe['percent'].rolling(24).std()
# Compter les baisses / hausses consécutives
# self.calculateDownAndUp(dataframe, limit=0.0001)
# df : ton dataframe OHLCV + indicateurs existants
# Assurez-vous que les colonnes suivantes existent :
# 'max_rsi_12', 'roc_24', 'bb_percent_1h'
# --- Filtrage des NaN initiaux ---
# dataframe = dataframe.dropna()
dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI
dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance
dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width']
dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3)
dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9)
dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0
###########################################################
# print(f"min={dataframe['absolute_min'].min()} max={dataframe['absolute_max'].max()}")
for i in [0, 1, 2, 3]:
dataframe[f"lvl_{i}_pct"] = dataframe['absolute_min'] * (1 + 0.01 * i)
self.model_indicators = self.listUsableColumns(dataframe)
if False and self.dp.runmode.value in ('backtest'):
self.trainModel(dataframe, metadata)
short_pair = self.getShortName(pair)
# path=f"user_data/strategies/plots/{short_pair}/"
self.model = joblib.load(f"{self.path}/{short_pair}_rf_model.pkl")
# Préparer les features pour la prédiction
features = dataframe[self.model_indicators].fillna(0)
# Prédiction : probabilité que le prix monte
# Affichage des colonnes intérressantes dans le model
features_pruned, kept_features = self.prune_features(
model=self.model,
dataframe=dataframe,
feature_columns=self.model_indicators,
importance_threshold=0.005 # enlever features < % importance
)
probs = self.model.predict_proba(features)[:, 1]
# Sauvegarder la probabilité pour lanalyse
dataframe['ml_prob'] = probs
if False and self.dp.runmode.value in ('backtest'):
self.inspect_model(self.model)
#
# absolute_min = dataframe['absolute_min'].min()
# absolute_max = dataframe['absolute_max'].max()
#
# # Écart total
# diff = absolute_max - absolute_min
#
# # Nombre de lignes intermédiaires (1% steps)
# steps = int((absolute_max - absolute_min) / (absolute_min * 0.01))
#
# # Niveaux de prix à 1%, 2%, ..., steps%
# levels = [absolute_min * (1 + i / 100) for i in range(1, steps + 1)]
# levels = [lvl for lvl in levels if lvl < absolute_max] # évite le dernier niveau exact
#
# # ajout dans le DataFrame
# for i, lvl in enumerate(levels, start=1):
# dataframe[f"lvl_{i}_pct"] = lvl
# # Indices correspondants
# indices = [(dataframe['mid'] - lvl).abs().idxmin() for lvl in levels]
# Non utilisé dans le modèle
dataframe['min60'] = talib.MIN(dataframe['mid'], timeperiod=60)
# val = 90000
# steps = 12
# [0.018, 0.022, 0.025, 0.028, 0.032, 0.035, 0.038, 0.042, 0.045, 0.048, 0.052, 0.055]
# val = 100000
# steps = 20
# [0.012, 0.014, 0.015, 0.016, 0.018, 0.019, 0.02, 0.022, 0.023, 0.024, 0.025, 0.027, 0.028, 0.029, 0.031, 0.032,
# 0.033, 0.035, 0.036, 0.037]
# val = 110000
# steps = 28
# [0.01, 0.01, 0.011, 0.012, 0.013, 0.013, 0.014, 0.015, 0.015, 0.016, 0.017, 0.018, 0.018, 0.019, 0.02, 0.02,
# 0.021, 0.022, 0.023, 0.023, 0.024, 0.025, 0.025, 0.026, 0.027, 0.028, 0.028, 0.029]
# val = 120000
# steps = 35
# [0.008, 0.009, 0.009, 0.01, 0.01, 0.011, 0.011, 0.012, 0.012, 0.013, 0.013, 0.014, 0.014, 0.015, 0.015, 0.016,
# 0.016, 0.017, 0.017, 0.018, 0.018, 0.019, 0.019, 0.019, 0.02, 0.02, 0.021, 0.021, 0.022, 0.022, 0.023, 0.023,
# 0.024, 0.024, 0.025]
# def split_ratio_one_third(n, p):
# a = n / (2 * p) # première valeur
# d = n / (p * (p - 1)) # incrément
# return [round(a + i * d, 3) for i in range(p)]
#
allow_decrease_rate = 0.3
# for val in range(70000, 140000, 10000):
# ath = 126000
#
# steps = self.calculateNumberOfSteps(val, ath, max_steps=40)
# self.printLog(f"allow_decrease_rate={self.allow_decrease_rate.value} val={val} steps={steps} pct={round((val - (ath * (1 - allow_decrease_rate))) / val, 4)}")
# # dca = split_ratio_one_third((val - (ath * (1 - self.allow_decrease_rate.value))) / ath, steps)
# # self.printLog(dca)
# dca_thresholds = self.progressive_parts(
# (val - (ath * (1 - self.allow_decrease_rate.value))) / val,
# steps, self.first_adjust_param.value)
# print(f"val={val} lim={ath * (1 - self.allow_decrease_rate.value)}"
# f"steps={steps} "
# f"pct={(round(val - (ath * (1 - self.allow_decrease_rate.value))) / val, 4)}")
# print(dca_thresholds)
ath = 126000
last_candle = dataframe.iloc[-1].squeeze()
val = last_candle['first_price']
# steps = self.calculateNumberOfSteps(val, ath, max_steps=40)
# self.printLog(
# f"allow_decrease_rate={self.allow_decrease_rate.value} val={val} steps={steps} pct={round((val - (ath * (1 - allow_decrease_rate))) / val, 4)}")
# dca_thresholds = self.progressive_parts((val - (ath * (1 - self.allow_decrease_rate.value))) / val, steps, self.first_adjust_param.value)
# print(f"val={val} lim={ath * (1 - self.allow_decrease_rate.value)}"
# f"steps={steps} "
# f"pct={(round(val - (ath * (1 - self.allow_decrease_rate.value))) / val, 4)}")
# print(dca_thresholds)
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
full, mises, steps = self.calculateMises(last_candle, pair)
# stake = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))
if val and len(self.pairs[pair]['dca_thresholds']) > 0:
print(self.pairs[pair]['dca_thresholds'])
count = 0
pct = 0
dataframe = dataframe.copy()
for dca in self.pairs[pair]['dca_thresholds']:
stake = mises[count]
pct += dca
offset = self.dynamic_trailing_offset(price=val, ath=ath, nb_entries=count)
if (count == self.pairs[pair]['count_of_buys']):
print(f"next_buy={val * (1 - pct)} count={count} pct={round(pct, 4)}")
dataframe[f"next_buy"] = val * (1 - pct)
count += 1
print(
f"stake={stake} count={count} pct={round(pct, 4)} offset={offset} next_buy={val * (1 - pct)}")
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
self.trades = Trade.get_open_trades()
return self.trades
# def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# # (dataframe['sma5_inv'] == 1)
# (
# (dataframe['pct180'] < 0.5) |
# (
# (dataframe['close'] < dataframe['sma60'] )
# & (dataframe['sma24_deriv1'] > 0)
# )
# )
# # & (dataframe['hapercent'] > 0)
# # & (dataframe['sma24_deriv1'] > - 0.03)
# & (dataframe['ml_prob'] > 0.1)
# # & (
# # (dataframe['percent3'] <= -0.003)
# # | (dataframe['percent12'] <= -0.003)
# # | (dataframe['percent24'] <= -0.003)
# # )
# ), ['enter_long', 'enter_tag']] = (1, f"future")
#
# dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
#
# return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Buy when the model predicts a high upside probability/value.
This method loads the ML model, generates predictions, and
triggers a buy if the predicted value exceeds a learned threshold.
"""
# # Ensure prediction column exists
# if "ml_prediction" not in dataframe.columns:
# # Generate predictions on the fly
# # (your model must already be loaded in self.model)
# features = self.ml_features # list of feature column names
# dataframe["ml_prediction"] = self.model.predict(dataframe[features].fillna(0))
# Choose threshold automatically based on training statistics
# or a fixed value discovered by SHAP / PDP
# threshold = 0.4 #self.buy_threshold # ex: 0.80 or 1.10 depending on your model
# 20% des signaux les plus forts
threshold = np.percentile(dataframe["ml_prob"], 80)
# Buy = prediction > threshold
dataframe["buy"] = 0
dataframe.loc[
(dataframe["ml_prob"].shift(1) < dataframe["ml_prob"])
& (dataframe['sma24_deriv1'] > 0)
& (dataframe['sma12_deriv1'] > 0)
# & (dataframe['open'] < dataframe['max180'] * 0.997)
# & (dataframe['min180'].shift(3) == dataframe['min180'])
, ['enter_long', 'enter_tag']
] = (1, f"future")
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
return dataframe
# def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# """
# Populate buy signals based on SHAP/PDP insights:
# - strong momentum: macdhist high and macd > macdsignal
# - rsi elevated (but not extreme)
# - positive sma24 derivative above threshold
# - price above sma60 (trend context)
# - price in upper region of Bollinger (bb_percent high)
# - volume/obv filter and volatility guard (obv_dist, atr)
# Returns dataframe with column 'buy' (1 = buy signal).
# """
#
# # Ensure column existence (fallback to zeros if missing)
# cols = [
# "macdhist", "macd", "macdsignal", "rsi", "rsi_short",
# "sma24_deriv1", "sma60", "bb_percent",
# "obv_dist", "atr", "percent", "open_1h", "absolute_min"
# ]
# for c in cols:
# if c not in dataframe.columns:
# dataframe[c] = 0.0
#
# # Thresholds (tune these)
# TH_MACDHIST = 8.0 # macdhist considered "strong" (example)
# TH_MACD_POS = 0.0 # macd must be > 0 (positive momentum)
# TH_SMA24_DERIV = 0.05 # sma24 derivative threshold where effect appears
# TH_RSI_LOW = 52.0 # lower bound to consider bullish RSI
# TH_RSI_HIGH = 85.0 # upper bound to avoid extreme overbought (optional)
# TH_BB_PERCENT = 0.7 # in upper band (0..1)
# TH_OBV_DIST = -40.0 # accept small negative OBV distance, reject very negative
# MAX_ATR = None # optional: maximum ATR to avoid extreme volatility (None = off)
# MIN_PRICE_ABOVE_SMA60 = 0.0 # require price > sma60 (price - sma60 > 0)
#
# price = dataframe["close"]
#
# # Momentum conditions
# cond_macdhist = dataframe["macdhist"] >= TH_MACDHIST
# cond_macd_pos = dataframe["macd"] > TH_MACD_POS
# cond_macd_vs_signal = dataframe["macd"] > dataframe["macdsignal"]
#
# # RSI condition (accept moderate-high RSI)
# cond_rsi = (dataframe["rsi"] >= TH_RSI_LOW) & (dataframe["rsi"] <= TH_RSI_HIGH)
#
# # SMA24 derivative: require momentum above threshold
# cond_sma24 = dataframe["sma24_deriv1"] >= TH_SMA24_DERIV
#
# # Price above SMA60 (trend filter)
# cond_above_sma60 = (price - dataframe["sma60"]) > MIN_PRICE_ABOVE_SMA60
#
# # Bollinger band percent (price in upper region)
# cond_bb = dataframe["bb_percent"] >= TH_BB_PERCENT
#
# # Volume/OBV prudence filter
# cond_obv = dataframe["obv_dist"] >= TH_OBV_DIST
#
# # Optional ATR guard
# if MAX_ATR is not None:
# cond_atr = dataframe["atr"] <= MAX_ATR
# else:
# cond_atr = np.ones_like(dataframe["atr"], dtype=bool)
#
# # Optional additional guards (avoid tiny percent moves or weird opens)
# cond_percent = np.abs(dataframe["percent"]) > 0.0005 # ignore almost-no-move bars
# cond_open = True # keep as placeholder; you can add open_1h relative checks
#
# # Combine into a buy signal
# buy_condition = (
# cond_macdhist &
# cond_macd_pos &
# cond_macd_vs_signal &
# cond_rsi &
# cond_sma24 &
# cond_above_sma60 &
# cond_bb &
# cond_obv &
# cond_atr &
# cond_percent
# )
#
# # Finalize: set buy column (0/1)
# dataframe.loc[buy_condition, ['enter_long', 'enter_tag']] = (1, f"future")
# # dataframe.loc[~buy_condition, "buy"] = 0
#
# dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
#
# return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
return dataframe
# def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# # Calculer le minimum des 14 derniers jours
# nb_pairs = len(self.dp.current_whitelist())
#
# base_stake_amount = self.config.get('stake_amount')
#
# if True : #self.pairs[pair]['count_of_buys'] == 0:
# factor = 1 #65 / min(65, last_candle['rsi_1d'])
# # if last_candle['min_max_60'] > 0.04:
# # factor = 2
#
# adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor)
# else:
# adjusted_stake_amount = self.pairs[pair]['first_amount']
#
# if self.pairs[pair]['count_of_buys'] == 0:
# self.pairs[pair]['first_amount'] = adjusted_stake_amount
#
# return adjusted_stake_amount
def calculateNumberOfSteps(self, current, ath, max_steps=0):
if (max_steps == 0):
max_steps = self.max_steps.value
X_min = ath * (1 - self.allow_decrease_rate.value) # 126198 * 0.4 = 75718,8
Y_min = 1
Y_max = max_steps
a = (Y_max - Y_min) / (ath - X_min) # 39 ÷ (126198 126198×0,6) = 0,000772595
b = Y_min - a * X_min # 1 (0,000772595 × 75718,8) = 38
y = a * current + b # 0,000772595 * 115000 - 38
return max(round(y), 1) # évite les valeurs négatives
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
if self.pairs[pair]['first_amount'] > 0:
mises = self.pairs[pair]['mises']
count = self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain']
return mises[count] if count < len(mises) else self.pairs[pair]['first_amount']
full, mises, steps = self.calculateMises(last_candle, pair)
base_stake = mises[self.pairs[pair]['count_of_buys']] if self.pairs[pair]['count_of_buys'] < len(
mises) else full / (steps * 2)
return base_stake
def calculateMises(self, last_candle, pair):
ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle['date']))
self.pairs[pair]['last_ath'] = ath
full = self.wallets.get_total_stake_amount()
steps = self.calculateNumberOfSteps(last_candle['mid'], ath, max_steps=self.max_steps.value)
mises = self.progressive_parts(full, steps, full / (steps * 2))
print(f"ath={ath} full={full} steps={steps} mises={mises} ")
self.pairs[pair]['mises'] = mises
return full, mises, steps
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# self.printLog("skip open orders")
return None
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
if (len(dataframe) < 1):
# self.printLog("skip dataframe")
return None
last_candle = dataframe.iloc[-1].squeeze()
# before_last_candle = dataframe.iloc[-2].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
# open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
# hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
# days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
# minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
count_of_buys = trade.nr_of_successful_entries
# current_time_utc = current_time.astimezone(timezone.utc)
# open_date = trade.open_date.astimezone(timezone.utc)
# days_since_open = (current_time_utc - open_date).days
pair = trade.pair
profit = trade.calc_profit(current_rate) # round(current_profit * trade.stake_amount, 1)
# last_lost = self.getLastLost(last_candle, pair)
pct_first = 0
# total_counts = sum(
# pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC')
#
# if self.pairs[pair]['first_buy']:
# pct_first = self.getPctFirstBuy(pair, last_candle)
# if profit > - self.pairs[pair]['first_amount'] \
# and self.wallets.get_available_stake_amount() < self.pairs[pair]['first_amount'] \
# and last_candle['sma24_deriv1_1h'] < 0:
# stake_amount = trade.stake_amount
# self.pairs[pair]['previous_profit'] = profit
# trade_type = "Sell " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
# self.pairs[trade.pair]['count_of_buys'] += 1
# self.pairs[pair]['total_amount'] = stake_amount
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟥 Stoploss",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type=trade_type,
# profit=round(profit, 1),
# buys=trade.nr_of_successful_entries + 1,
# stake=round(stake_amount, 2)
# )
#
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
#
# return -stake_amount
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
return 0
lim = 0.3
if (len(dataframe) < 1):
# self.printLog("skip dataframe")
return None
# Dernier prix d'achat réel (pas le prix moyen)
last_fill_price = self.pairs[trade.pair]['last_buy'] # trade.open_rate # remplacé juste après ↓
# if len(trade.orders) > 0:
# # On cherche le dernier BUY exécuté
# buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"]
# if buy_orders:
# last_fill_price = buy_orders[-1].price
# baisse relative
# if minutes % 60 == 0:
# ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle['date']))
# self.pairs[pair]['last_ath'] = ath
# else:
# ath = self.pairs[pair]['last_ath']
# steps = self.approx_value(last_candle['mid'], ath)
# dca_thresholds = split_ratio_one_third((last_candle['mid'] - (ath * self.allow_decrease_rate.value)) / last_candle['mid'], steps) #((last_candle['mid'] - (ath * self.allow_decrease_rate.value)) / steps) / last_candle['mid'] # 0.0025 + 0.0005 * count_of_buys
if len(self.pairs[pair]['dca_thresholds']) == 0:
self.calculateStepsDcaThresholds(last_candle, pair)
dca_threshold = self.pairs[pair]['dca_thresholds'][
min(count_of_buys - 1, len(self.pairs[pair]['dca_thresholds']) - 1)]
# self.printLog(f"{count_of_buys} {ath * (1 - self.allow_decrease_rate.value)} {round(last_candle['mid'], 2)} {round((last_candle['mid'] - (ath * self.allow_decrease_rate.value)) / last_candle['mid'], 2)} {steps} {round(dca_threshold, 4)}")
decline = (last_fill_price - current_rate) / last_fill_price
increase = - decline
# if decline >= self.dca_threshold:
# # Exemple : on achète 50% du montant du dernier trade
# last_amount = buy_orders[-1].amount if buy_orders else 0
# stake_amount = last_amount * current_rate * 0.5
# return stake_amount
########################### ALGO ATH
# # --- 1. Calcul ATH local de la paire ---
# ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle['date']))
#
# # --- 2. Distance ATH - current ---
# dd = (current_rate - ath) / ath * 100 # dd <= 0
#
# if dd > -1: # pas de renfort si drawdown trop faible
# return None
#
# # --- 3. DCA dynamique (modèle exponentiel) ---
# a = 0.015
# b = 0.12
#
# pct = a * (math.exp(b * (-dd)) - 1) # proportion du wallet libre
#
# # Clamp de sécurité
# pct = min(max(pct, 0), 0.35) # max 35% dun coup
#
# if pct <= 0:
# return None
#
# # --- 4. Stake en fonction du wallet libre ---
# stake_amount = self.wallets.get_available_stake_amount() * pct
#
# if stake_amount < self.min_stake_amount:
# return None
# FIN ########################## ALGO ATH
force = False #hours > self.hours_force.value and last_candle[self.indic_1h_force_buy.value] > 0
condition = last_candle['percent'] > 0 and last_candle['sma24_deriv1'] > 0 \
and last_candle['close'] < self.pairs[pair]['first_buy']
if ((force or decline >= dca_threshold) and condition):
try:
if self.pairs[pair]['has_gain'] and profit > 0:
self.pairs[pair]['force_sell'] = True
self.pairs[pair]['previous_profit'] = profit
return None
stake_amount = min(self.wallets.get_available_stake_amount(),
self.adjust_stake_amount(pair, last_candle))
# if force:
# stake_amount = stake_amount / 2
# self.printLog(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}")
if stake_amount > 0:
self.pairs[pair]['previous_profit'] = profit
trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟧 " + ("Force" if force else 'Loss -'),
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
# colonnes_a_exclure = ['last_candle', 'stop',
# 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# self.printLog(df_filtered)
return stake_amount
return None
except Exception as exception:
self.printLog(exception)
return None
increase_dca_threshold = 0.003
if current_profit > increase_dca_threshold \
and (increase >= increase_dca_threshold and self.wallets.get_available_stake_amount() > 0) \
and last_candle['rsi'] < 75:
try:
self.pairs[pair]['previous_profit'] = profit
stake_amount = max(10, min(self.wallets.get_available_stake_amount(),
self.adjust_stake_amount(pair, last_candle)))
if stake_amount > 0:
self.pairs[pair]['has_gain'] += 1
trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟡 Gain +",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type='Gain ' + str(round(increase, 4)),
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
except Exception as exception:
self.printLog(exception)
return None
return None
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
# last_candle_1h = dataframe.iloc[-13].squeeze()
# before_last_candle = dataframe.iloc[-2].squeeze()
# before_last_candle_2 = dataframe.iloc[-3].squeeze()
# before_last_candle_12 = dataframe.iloc[-13].squeeze()
#
# expected_profit = self.expectedProfit(pair, last_candle)
# # self.printLog(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
#
# # ----- 1) Charger les variables de trailing pour ce trade -----
# max_price = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
self.pairs[pair]['current_trade'] = trade
count_of_buys = trade.nr_of_successful_entries
profit = trade.calc_profit(current_rate) # round(current_profit * trade.stake_amount, 1)
if current_profit > 0:
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
# else:
# self.pairs[pair]['max_profit'] = 0
max_profit = self.pairs[pair]['max_profit']
# if current_profit > 0:
# self.printLog(f"profit={profit} max_profit={max_profit} current_profit={current_profit}")
# baisse = 0
# if profit > 0:
# baisse = 1 - (profit / max_profit)
# mx = max_profit / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount())
# hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
# days_since_first_buy = (current_time - trade.open_date_utc).days
# hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
# ----- 2) Mise à jour du max_price -----
self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
# ----- 3) Calcul du profit max atteint -----
# profit_max = (max_price - trade.open_rate) / trade.open_rate
current_trailing_stop_positive = self.trailing_stop_positive
current_trailing_only_offset_is_reached = self.trailing_only_offset_is_reached
current_trailing_stop_positive_offset = self.trailing_stop_positive_offset
current_trailing_stop_positive_offset = self.dynamic_trailing_offset(price=current_rate,
ath=self.pairs[pair]['last_ath'],
nb_entries=count_of_buys)
# max_ = last_candle['max180']
# min_ = last_candle['min180']
# mid = last_candle['mid']
# éviter division par zéro
# position = (mid - min_) / (max_ - min_)
# zone = int(position * 3) # 0 à 2
# if zone == 0:
# current_trailing_stop_positive = self.trailing_stop_positive
# current_trailing_stop_positive_offset = self.trailing_stop_positive_offset * 2
# if minutes > 1440:
# current_trailing_only_offset_is_reached = False
# current_trailing_stop_positive_offset = self.trailing_stop_positive_offset
# if zone == 1:
# ----- 5) Calcul du trailing stop dynamique -----
# Exemple : offset=0.321 => stop à +24.8%
trailing_stop = max_profit * (1.0 - current_trailing_stop_positive)
# baisse = 0
# if max_profit:
# baisse = (max_profit - profit) / max_profit
# if minutes % 1 == 0:
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type=f"{round(profit, 2)} {round(max_profit, 2)} {round(trailing_stop,2)} {minutes}",
# profit=round(profit, 2),
# buys=count_of_buys,
# stake=0
# )
if profit < 0:
return None
if last_candle['rsi'] > 90:
return f"rsi_{count_of_buys}_{self.pairs[pair]['has_gain']}"
# if last_candle['sma12'] > last_candle['sma24']: # and last_candle['rsi'] < 85:
# return None
# if last_candle['sma24_deriv1'] > 0 : #and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15:
# if (minutes < 180):
# return None
# if (minutes > 1440 and last_candle['sma60_deriv1'] > 0) :
# return None
# ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? -----
if current_trailing_only_offset_is_reached and max_profit > current_trailing_stop_positive_offset:
# Max profit pas atteint ET perte < 2 * current_trailing_stop_positive
if profit > max_profit * (1 - current_trailing_stop_positive): # 2 * current_trailing_stop_positive:
print(
f"{current_time} trailing non atteint trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} "
f"max={round(max_profit, 4)} offset={current_trailing_stop_positive_offset}")
return None # ne pas activer le trailing encore
else:
print(
f"{current_time} trailing atteint trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} "
f"max={round(max_profit, 4)} offset={current_trailing_stop_positive_offset}")
else:
return None
# Sinon : trailing actif dès le début
# ----- 6) Condition de vente -----
if 0 < profit <= trailing_stop: # and last_candle['mid'] < last_candle['sma5']: # and profit > current_trailing_stop_positive_offset:
self.pairs[pair]['force_buy'] = True
print(
f"{current_time} Condition de vente trailing_stop={round(trailing_stop, 4)} profit={round(profit, 4)} max={round(max_profit, 4)} "
f"{round(min(2, max_profit * (1 - current_trailing_stop_positive)), 4)} offset={current_trailing_stop_positive_offset}")
return f"stop_{count_of_buys}_{self.pairs[pair]['has_gain']}"
return None
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1h') for pair in pairs]
informative_pairs += [(pair, '1d') for pair in pairs]
return informative_pairs
def populate1hIndicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# --- WEEKLY LEVELS ---
# semaine précédente = semaine ISO différente
df["week"] = df.index.isocalendar().week
df["year"] = df.index.year
df["weekly_low"] = (
df.groupby(["year", "week"])["low"]
.transform("min")
.shift(1) # décalé -> pas regarder la semaine en cours
)
df["weekly_high"] = (
df.groupby(["year", "week"])["high"]
.transform("max")
.shift(1)
)
# Définition simple d'une zone de demande hebdo :
# bas + 25% de la bougie => modifiable
df["weekly_demand_zone_low"] = df["weekly_low"]
df["weekly_demand_zone_high"] = df["weekly_low"] * 1.025
# --- MONTHLY LEVELS ---
df["month"] = df.index.month
df["monthly_low"] = (
df.groupby(["year", "month"])["low"]
.transform("min")
.shift(1) # mois précédent uniquement
)
df["monthly_high"] = (
df.groupby(["year", "month"])["high"]
.transform("max")
.shift(1)
)
df["monthly_demand_zone_low"] = df["monthly_low"]
df["monthly_demand_zone_high"] = df["monthly_low"] * 1.03
return df
# ----- SIGNALS SIMPLES POUR EXEMPLE -----
# def populate_buy_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# df["buy"] = 0
#
# # Exemple : acheter si le prix tape la zone de demande hebdomadaire
# df.loc[
# (df["close"] <= df["weekly_demand_zone_high"]) &
# (df["close"] >= df["weekly_demand_zone_low"]),
# "buy"
# ] = 1
#
# return df
#
# def populate_sell_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# df["sell"] = 0
#
# # Exemple : vendre sur retour au weekly_high précédent
# df.loc[df["close"] >= df["weekly_high"], "sell"] = 1
#
# return df
def rsi_trend_probability(self, dataframe, short=6, long=12):
dataframe = dataframe.copy()
dataframe['rsi_short'] = talib.RSI(dataframe['mid'], short)
dataframe['rsi_long'] = talib.RSI(dataframe['mid'], long)
dataframe['cross_soft'] = np.tanh((dataframe['rsi_short'] - dataframe['rsi_long']) / 7)
dataframe['gap'] = (dataframe['rsi_short'] - dataframe['rsi_long']) / 100
dataframe['trend'] = (dataframe['rsi_long'] - 50) / 50
dataframe['rtp'] = (
0.6 * dataframe['cross_soft'] +
0.25 * dataframe['gap'] +
0.15 * dataframe['trend']
).clip(-1, 1)
return dataframe
def to_utc_ts(self, x):
return pd.to_datetime(x, utc=True)
# suppose self.btc_ath_history exists (liste de dict)
def get_last_ath_before_candle(self, date):
candle_date = self.to_utc_ts(date) # ou to_utc_ts(last_candle.name)
best = None
for a in self.btc_ath_history: # getattr(self, "btc_ath_history", []):
ath_date = self.to_utc_ts(a["date"])
if ath_date <= candle_date:
if best is None or ath_date > best[0]:
best = (ath_date, a["price_usd"])
return best[1] if best is not None else None
def trainModel(self, dataframe: DataFrame, metadata: dict):
pair = self.getShortName(metadata['pair'])
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option("display.width", 200)
path = self.path # f"user_data/plots/{pair}/"
os.makedirs(path, exist_ok=True)
# # Étape 1 : sélectionner numériques
# numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
#
# # Étape 2 : enlever constantes
# usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
# and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d")
# and not c.endswith("_class") and not c.endswith("_price")
# and not c.startswith('stop_buying'))]
#
# # Étape 3 : remplacer inf et NaN par 0
# dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
#
# print("Colonnes utilisables pour le modèle :")
# print(usable_cols)
#
# self.model_indicators = usable_cols
#
df = dataframe[self.model_indicators].copy()
# Corrélations des colonnes
corr = df.corr(numeric_only=True)
print("Corrélation des colonnes")
print(corr)
# 3⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
# df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int)
df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 100).astype(int)
df['target'] = df['target'].fillna(0).astype(int)
# Corrélations triées par importance avec une colonne cible
target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False)
print("Corrélations triées par importance avec une colonne cible")
print(target_corr)
# Corrélations triées par importance avec une colonne cible
corr = df.corr(numeric_only=True)
corr_unstacked = (
corr.unstack()
.reset_index()
.rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"})
)
# Supprimer les doublons col1/col2 inversés et soi-même
corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]]
# Trier par valeur absolue de corrélation
corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index)
print("Trier par valeur absolue de corrélation")
print(corr_sorted.head(20))
# --- Calcul de la corrélation ---
corr = df.corr(numeric_only=True) # évite les colonnes non numériques
corr = corr * 100 # passage en pourcentage
# --- Masque pour nafficher que le triangle supérieur (optionnel) ---
mask = np.triu(np.ones_like(corr, dtype=bool))
# --- Création de la figure ---
fig, ax = plt.subplots(figsize=(96, 36))
# --- Heatmap avec un effet “température” ---
sns.heatmap(
corr,
mask=mask,
cmap="coolwarm", # palette bleu → rouge
center=0, # 0 au centre
annot=True, # affiche les valeurs dans chaque case
fmt=".0f", # format entier (pas de décimale)
cbar_kws={"label": "Corrélation (%)"}, # légende à droite
linewidths=0.5, # petites lignes entre les cases
ax=ax
)
# --- Personnalisation ---
ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20)
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
# --- Sauvegarde ---
output_path = f"{self.path}/Matrice_de_correlation_temperature.png"
plt.savefig(output_path, bbox_inches="tight", dpi=150)
plt.close(fig)
print(f"✅ Matrice enregistrée : {output_path}")
# Exemple d'utilisation :
selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7)
print("===== 🎯 FEATURES SÉLECTIONNÉES =====")
print(selected_corr)
# Nettoyage
df = df.dropna()
X = df[self.model_indicators]
y = df['target'] # ta colonne cible binaire ou numérique
print("===== 🎯 FEATURES SCORES =====")
print(self.feature_auc_scores(X, y))
# 4⃣ Split train/test
X = df[self.model_indicators]
y = df['target']
# Séparation temporelle (train = 80 %, valid = 20 %)
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False)
# Nettoyage des valeurs invalides
selector = VarianceThreshold(threshold=0.0001)
selector.fit(X_train)
selected = X_train.columns[selector.get_support()]
print("Colonnes conservées :", list(selected))
# 5⃣ Entraînement du modèle
# self.train_model = RandomForestClassifier(n_estimators=200, random_state=42)
# def objective(trial):
# self.train_model = XGBClassifier(
# n_estimators=trial.suggest_int("n_estimators", 200, 300),
# max_depth=trial.suggest_int("max_depth", 3, 6),
# learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3),
# subsample=trial.suggest_float("subsample", 0.7, 1.0),
# colsample_bytree=trial.suggest_float("colsample_bytree", 0.7, 1.0),
# scale_pos_weight=1, # tu mettras balance_ratio ici si tu veux
# objective="binary:logistic",
# eval_metric="logloss",
# n_jobs=-1
# )
#
# self.train_model.fit(X_train, y_train)
#
# y_pred = self.train_model.predict(X_valid) # <-- validation = test split
# return f1_score(y_valid, y_pred)
#
# study = optuna.create_study(direction="maximize")
# study.optimize(objective, n_trials=50)
def objective(trial):
# local_model = XGBClassifier(
# n_estimators=300, # nombre d'arbres plus raisonnable
# learning_rate=0.01, # un peu plus rapide que 0.006, mais stable
# max_depth=4, # capture plus de patterns que 3, sans overfitting excessif
# subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting
# colsample_bytree=0.8, # 80% des features par arbre
# gamma=0.01, # gain minimal pour un split → régularisation
# reg_alpha=0.01, # L1 régularisation des feuilles
# reg_lambda=1, # L2 régularisation des feuilles
# n_jobs=-1, # utilise tous les cœurs CPU pour accélérer
# random_state=42, # reproductibilité
# missing=float('nan'), # valeur manquante reconnue
# eval_metric='logloss' # métrique pour classification binaire
# )
local_model = XGBClassifier(
n_estimators=trial.suggest_int("n_estimators", 300, 500),
max_depth=trial.suggest_int("max_depth", 1, 6),
learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True),
subsample=trial.suggest_float("subsample", 0.6, 1.0),
colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0),
scale_pos_weight=1,
objective="binary:logistic",
eval_metric="logloss",
n_jobs=-1
)
local_model.fit(
X_train,
y_train,
eval_set=[(X_valid, y_valid)],
# early_stopping_rounds=50,
verbose=False
)
proba = local_model.predict_proba(X_valid)[:, 1]
thresholds = np.linspace(0.1, 0.9, 50)
best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds)
return best_f1
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)
# SHAP
# Reconstruction du modèle final avec les meilleurs hyperparamètres
# Récupération des meilleurs paramètres trouvés
best_params = study.best_params
best_model = XGBClassifier(**best_params)
best_model.fit(X_train, y_train)
self.train_model = best_model
# === SHAP plots ===
# Calcul SHAP
explainer = shap.TreeExplainer(self.train_model)
shap_values = explainer(X_train)
# On choisit une observation pour le graphique waterfall
# Explication du modèle de prédiction pour la première ligne de X_valid.”
i = 0
# Extraction des valeurs
shap_val = shap_values[i].values
feature_names = X_train.columns
feature_values = X_train.iloc[i]
# Tri par importance absolue
# order = np.argsort(np.abs(shap_val))[::-1]
k = 10
order = np.argsort(np.abs(shap_val))[::-1][:k]
# ---- Création figure sans l'afficher ----
plt.ioff() # Désactive l'affichage interactif
shap.plots.waterfall(
shap.Explanation(
values=shap_val[order],
base_values=shap_values.base_values[i],
data=feature_values.values[order],
feature_names=feature_names[order]
),
show=False # IMPORTANT : n'affiche pas dans Jupyter / console
)
# Sauvegarde du graphique sur disque
output_path = f"{self.path}/shap_waterfall.png"
plt.savefig(output_path, dpi=200, bbox_inches='tight')
plt.close() # ferme la figure proprement
print(f"Graphique SHAP enregistré : {output_path}")
# FIN SHAP
# ---- après avoir exécuté la study ------
print("Best value (F1):", study.best_value)
print("Best params:", study.best_params)
best_trial = study.best_trial
print("\n=== BEST TRIAL ===")
print("Number:", best_trial.number)
print("Value:", best_trial.value)
print("Params:")
for k, v in best_trial.params.items():
print(f" - {k}: {v}")
# All trials summary
print("\n=== ALL TRIALS ===")
for t in study.trials:
print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}")
# DataFrame of trials
df = study.trials_dataframe()
print(df.head())
# Graphs
fig = plot_optimization_history(study)
fig.write_html(f"{self.path}/optimization_history.html")
fig = plot_param_importances(study)
fig.write_html(f"{self.path}/param_importances.html")
fig = plot_slice(study)
fig.write_html(f"{self.path}/slice.html")
fig = plot_parallel_coordinate(study)
fig.write_html(f"{self.path}/parallel_coordinates.html")
# 2⃣ Sélection des features AVANT calibration
sfm = SelectFromModel(self.train_model, threshold="median", prefit=True)
selected_features = X_train.columns[sfm.get_support()]
print(selected_features)
# 3⃣ Calibration ensuite (facultative)
calibrated = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5)
calibrated.fit(X_train[selected_features], y_train)
print(calibrated)
# # # calibration
# self.train_model = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5)
# # Sélection
# sfm = SelectFromModel(self.train_model, threshold="median")
# sfm.fit(X_train, y_train)
# selected_features = X_train.columns[sfm.get_support()]
# print(selected_features)
# self.train_model.fit(X_train, y_train)
y_pred = self.train_model.predict(X_valid)
y_proba = self.train_model.predict_proba(X_valid)[:, 1]
# print(classification_report(y_valid, y_pred))
# print(confusion_matrix(y_valid, y_pred))
print("\nRapport de classification :\n", classification_report(y_valid, y_pred))
print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred))
# # Importances
# importances = pd.DataFrame({
# "feature": self.train_model.feature_name_,
# "importance": self.train_model.feature_importances_
# }).sort_values("importance", ascending=False)
# print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
# print(importances)
# Feature importance
importances = self.train_model.feature_importances_
feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False)
# Affichage
feat_imp.plot(kind='bar', figsize=(12, 6))
plt.title("Feature importances")
# plt.show()
plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight')
result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10, random_state=42)
perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False)
perm_imp.plot(kind='bar', figsize=(12, 6))
plt.title("Permutation feature importance")
# plt.show()
plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight')
# Shap
explainer = shap.TreeExplainer(self.train_model)
shap_values = explainer.shap_values(X_valid)
# Résumé global
shap.summary_plot(shap_values, X_valid)
# Force plot pour une observation
force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :])
shap.save_html(f"{self.path}/shap_force_plot.html", force_plot)
fig, ax = plt.subplots(figsize=(24, 48))
PartialDependenceDisplay.from_estimator(
self.train_model,
X_valid,
selected_features,
kind="average",
ax=ax
)
fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight")
plt.close(fig)
best_f1 = 0
best_t = 0.5
for t in [0.3, 0.4, 0.5, 0.6, 0.7]:
y_pred_thresh = (y_proba > t).astype(int)
score = f1_score(y_valid, y_pred_thresh)
print(f"Seuil {t:.1f} → F1: {score:.3f}")
if score > best_f1:
best_f1 = score
best_t = t
print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}")
# 6⃣ Évaluer la précision (facultatif)
preds = self.train_model.predict(X_valid)
acc = accuracy_score(y_valid, preds)
print(f"Accuracy: {acc:.3f}")
# 7⃣ Sauvegarde du modèle
joblib.dump(self.train_model, f"{self.path}/{pair}_rf_model.pkl")
print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl")
# X = dataframe des features (après shift/rolling/indicators)
# y = target binaire ou décimale
# model = ton modèle entraîné (RandomForestClassifier ou Regressor)
# # --- 1⃣ Mutual Information (MI) ---
# mi_scores = mutual_info_classif(X.fillna(0), y)
# mi_series = pd.Series(mi_scores, index=X.columns, name='MI')
#
# # --- 2⃣ Permutation Importance (PI) ---
# pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
# pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI')
#
# # --- 3⃣ Combinaison dans un seul dataframe ---
# importance_df = pd.concat([mi_series, pi_series], axis=1)
# importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle
# print(importance_df)
#
# importance_df.plot(kind='bar', figsize=(10, 5))
# plt.title("Mutual Info vs Permutation Importance")
# plt.ylabel("Score")
# plt.show()
self.analyze_model(pair, self.train_model, X_train, X_valid, y_train, y_valid)
def inspect_model(self, model):
"""
Affiche les informations d'un modèle ML déjà entraîné.
Compatible avec scikit-learn, xgboost, lightgbm, catboost...
"""
print("===== 🔍 INFORMATIONS DU MODÈLE =====")
# Type de modèle
print(f"Type : {type(model).__name__}")
print(f"Module : {model.__class__.__module__}")
# Hyperparamètres
if hasattr(model, "get_params"):
params = model.get_params()
print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====")
for k, v in params.items():
print(f"{k}: {v}")
# Nombre destimateurs
if hasattr(model, "n_estimators"):
print(f"\nNombre destimateurs : {model.n_estimators}")
# Importance des features
if hasattr(model, "feature_importances_"):
print("\n===== 📊 IMPORTANCE DES FEATURES =====")
# Correction ici :
feature_names = getattr(model, "feature_names_in_", None)
if isinstance(feature_names, np.ndarray):
feature_names = feature_names.tolist()
elif feature_names is None:
feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))]
fi = pd.DataFrame({
"feature": feature_names,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(fi)
# Coefficients (modèles linéaires)
if hasattr(model, "coef_"):
print("\n===== ➗ COEFFICIENTS =====")
coef = np.array(model.coef_)
if coef.ndim == 1:
for i, c in enumerate(coef):
print(f"Feature {i}: {c:.6f}")
else:
print(coef)
# Intercept
if hasattr(model, "intercept_"):
print("\nIntercept :", model.intercept_)
# Classes connues
if hasattr(model, "classes_"):
print("\n===== 🎯 CLASSES =====")
print(model.classes_)
# Scores internes
for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]:
if hasattr(model, attr):
print(f"\n{attr} = {getattr(model, attr)}")
# Méthodes disponibles
print("\n===== 🧩 MÉTHODES DISPONIBLES =====")
methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)]
print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else ""))
print("\n===== ✅ FIN DE LINSPECTION =====")
def analyze_model(self, pair, model, X_train, X_valid, y_train, y_valid):
"""
Analyse complète d'un modèle ML supervisé (classification binaire).
Affiche performances, importance des features, matrices, seuils, etc.
"""
os.makedirs(self.path, exist_ok=True)
# ---- Prédictions ----
preds = model.predict(X_valid)
probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds
# ---- Performances globales ----
print("===== 📊 ÉVALUATION DU MODÈLE =====")
print("Colonnes du modèle :", model.feature_names_in_)
print("Colonnes X_valid :", list(X_valid.columns))
print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}")
print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}")
print("TN (True Negative) / FP (False Positive)")
print("FN (False Negative) / TP (True Positive)")
print("\nRapport de classification :\n", classification_report(y_valid, preds))
# | Élément | Valeur | Signification |
# | ------------------- | ------ | ----------------------------------------------------------- |
# | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas dachat) |
# | FP (False Positive) | 43 | Modèle a prédit 1 alors que cétait 0 (faux signal dachat) |
# | FN (False Negative) | 108 | Modèle a prédit 0 alors que cétait 1 (manqué un achat) |
# | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal dachat) |
# ---- Matrice de confusion ----
cm = confusion_matrix(y_valid, preds)
print("Matrice de confusion :\n", cm)
plt.figure(figsize=(4, 4))
plt.imshow(cm, cmap="Blues")
plt.title("Matrice de confusion")
plt.xlabel("Prédit")
plt.ylabel("Réel")
for i in range(2):
for j in range(2):
plt.text(j, i, cm[i, j], ha="center", va="center", color="black")
# plt.show()
plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight")
plt.close()
# ---- Importance des features ----
if hasattr(model, "feature_importances_"):
print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
importance = pd.DataFrame({
"feature": X_train.columns,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(importance)
# Crée une figure plus grande
fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces
# Trace le bar plot sur cet axe
importance.plot.bar(x="feature", y="importance", legend=False, ax=ax)
# Tourner les labels pour plus de lisibilité
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right')
plt.title("Importance des features")
# plt.show()
plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight")
plt.close()
# ---- Arbre de décision (extrait) ----
if hasattr(model, "estimators_"):
print("\n===== 🌳 EXTRAIT DUN ARBRE =====")
print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800])
# ---- Précision selon le seuil ----
thresholds = np.linspace(0.1, 0.9, 9)
print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====")
for t in thresholds:
preds_t = (probs > t).astype(int)
acc = accuracy_score(y_valid, preds_t)
print(f"Seuil {t:.1f} → précision {acc:.3f}")
# ---- ROC Curve ----
fpr, tpr, _ = roc_curve(y_valid, probs)
plt.figure(figsize=(5, 4))
plt.plot(fpr, tpr, label="ROC curve")
plt.plot([0, 1], [0, 1], linestyle="--", color="gray")
plt.xlabel("Taux de faux positifs")
plt.ylabel("Taux de vrais positifs")
plt.title("Courbe ROC")
plt.legend()
# plt.show()
plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight")
plt.close()
# # ---- Interprétation SHAP (optionnelle) ----
# try:
# import shap
#
# print("\n===== 💡 ANALYSE SHAP =====")
# explainer = shap.TreeExplainer(model)
# shap_values = explainer.shap_values(X_valid)
# # shap.summary_plot(shap_values[1], X_valid)
# # Vérifie le type de sortie de shap_values
# if isinstance(shap_values, list):
# # Cas des modèles de classification (plusieurs classes)
# shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1]
# else:
# shap_values_to_plot = shap_values
#
# # Ajustement des dimensions au besoin
# if shap_values_to_plot.shape[1] != X_valid.shape[1]:
# print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})")
# min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1])
# shap_values_to_plot = shap_values_to_plot[:, :min_dim]
# X_to_plot = X_valid.iloc[:, :min_dim]
# else:
# X_to_plot = X_valid
#
# plt.figure(figsize=(12, 4))
# shap.summary_plot(shap_values_to_plot, X_to_plot, show=False)
# plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight")
# plt.close()
# except ImportError:
# print("\n(SHAP non installé — `pip install shap` pour activer lanalyse SHAP.)")
y_proba = model.predict_proba(X_valid)[:, 1]
# Trace ou enregistre le graphique
self.plot_threshold_analysis(y_valid, y_proba, step=0.05,
save_path=f"{self.path}/threshold_analysis.png")
# y_valid : vraies classes (0 / 1)
# y_proba : probabilités de la classe 1 prédites par ton modèle
# Exemple : y_proba = model.predict_proba(X_valid)[:, 1]
seuils = np.arange(0.0, 1.01, 0.05)
precisions, recalls, f1s = [], [], []
for seuil in seuils:
y_pred = (y_proba >= seuil).astype(int)
precisions.append(precision_score(y_valid, y_pred))
recalls.append(recall_score(y_valid, y_pred))
f1s.append(f1_score(y_valid, y_pred))
plt.figure(figsize=(10, 6))
plt.plot(seuils, precisions, label='Précision', marker='o')
plt.plot(seuils, recalls, label='Rappel', marker='o')
plt.plot(seuils, f1s, label='F1-score', marker='o')
# Ajoute un point pour le meilleur F1
best_idx = np.argmax(f1s)
plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f'Max F1 ({seuils[best_idx]:.2f})')
plt.title("Performance du modèle selon le seuil de probabilité")
plt.xlabel("Seuil de probabilité (classe 1)")
plt.ylabel("Score")
plt.grid(True, alpha=0.3)
plt.legend()
plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight')
# plt.show()
print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}")
print("\n===== ✅ FIN DE LANALYSE =====")
def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None):
"""
Affiche la précision, le rappel et le F1-score selon le seuil de décision.
y_true : labels réels (0 ou 1)
y_proba : probabilités prédites (P(hausse))
step : pas entre les seuils testés
save_path : si renseigné, enregistre l'image au lieu d'afficher
"""
# Le graphique généré affichera trois courbes :
# 🔵 Precision — la fiabilité de tes signaux haussiers.
# 🟢 Recall — la proportion de hausses que ton modèle détecte.
# 🟣 F1-score — le compromis optimal entre les deux.
thresholds = np.arange(0, 1.01, step)
precisions, recalls, f1s = [], [], []
for thr in thresholds:
preds = (y_proba >= thr).astype(int)
precisions.append(precision_score(y_true, preds))
recalls.append(recall_score(y_true, preds))
f1s.append(f1_score(y_true, preds))
plt.figure(figsize=(10, 6))
plt.plot(thresholds, precisions, label="Precision", linewidth=2)
plt.plot(thresholds, recalls, label="Recall", linewidth=2)
plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--")
plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5")
plt.title("📊 Performance selon le seuil de probabilité", fontsize=14)
plt.xlabel("Seuil de décision (threshold)")
plt.ylabel("Score")
plt.legend()
plt.grid(True, alpha=0.3)
if save_path:
plt.savefig(save_path, bbox_inches='tight')
print(f"✅ Graphique enregistré : {save_path}")
else:
plt.show()
def feature_auc_scores(self, X, y):
aucs = {}
for col in X.columns:
try:
aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0))
except Exception:
aucs[col] = np.nan
return pd.Series(aucs).sort_values(ascending=False)
def listUsableColumns(self, dataframe):
# Étape 1 : sélectionner numériques
numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
# Étape 2 : enlever constantes
usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
and not c.endswith("_state")
and not c.endswith("_1d")
# and not c.endswith("_1h")
and not c.startswith("open") and not c.startswith("close")
and not c.startswith("low") and not c.startswith("high")
and not c.startswith("haopen") and not c.startswith("haclose")
# and not c.startswith("bb_lower") and not c.startswith("bb_upper")
# and not c.startswith("bb_middle")
and not c.endswith("_count")
and not c.endswith("_class") and not c.endswith("_price")
and not c.startswith('stop_buying')
and not c.startswith('target')
and not c.startswith('lvl')
and not c.startswith('sma5_deriv1_1h')
and not c.startswith('sma5_1h')
and not c.startswith('sma12_deriv1_1h')
and not c.startswith('sma12_1h')
and not c.startswith('confidence_index')
]
# Étape 3 : remplacer inf et NaN par 0
dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
# print("Colonnes utilisables pour le modèle :")
# print(usable_cols)
# self.model_indicators = usable_cols
return usable_cols
def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7):
"""
Sélectionne les features les plus corrélées avec target,
tout en supprimant celles trop corrélées entre elles.
"""
# 1⃣ Calcul des corrélations absolues avec la cible
corr = df.corr(numeric_only=True)
corr_target = corr[target].abs().sort_values(ascending=False)
# 2⃣ Prend les N features les plus corrélées avec la cible (hors target)
features = corr_target.drop(target).head(top_n).index.tolist()
# 3⃣ Évite les features trop corrélées entre elles
selected = []
for feat in features:
too_correlated = False
for sel in selected:
if abs(corr.loc[feat, sel]) > corr_threshold:
too_correlated = True
break
if not too_correlated:
selected.append(feat)
# 4⃣ Retourne un DataFrame propre avec les valeurs de corrélation
selected_corr = pd.DataFrame({
"feature": selected,
"corr_with_target": [corr.loc[f, target] for f in selected]
}).sort_values(by="corr_with_target", key=np.abs, ascending=False)
return selected_corr
def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'):
dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean()
dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", ema_period=window)
return dataframe
def calculeDerivees(
self,
dataframe: pd.DataFrame,
name: str,
suffixe: str = '',
window: int = 100,
coef: float = 0.15,
ema_period: int = 10,
verbose: bool = True,
) -> pd.DataFrame:
"""
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
avec epsilon adaptatif basé sur rolling percentiles.
"""
d1_col = f"{name}{suffixe}_deriv1"
d2_col = f"{name}{suffixe}_deriv2"
# d1s_col = f"{name}{suffixe}_deriv1_smooth"
# d2s_col = f"{name}{suffixe}_deriv2_smooth"
tendency_col = f"{name}{suffixe}_state"
factor1 = 100 * (ema_period / 5)
factor2 = 10 * (ema_period / 5)
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[
f"{name}{suffixe}"].shift(1)) \
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
# --- Distance à la moyenne mobile ---
dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[
f"{name}{suffixe}"]
# dérivée relative simple
dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
# lissage EMA
dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean()
# dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median()
dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1)
dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean()
# epsilon adaptatif via rolling percentile
p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05)
p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95)
p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05)
p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95)
eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef
eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef
# fallback global eps
global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef
global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef
eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1)
eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2)
# if verbose and self.dp.runmode.value in ('backtest'):
# stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T
# stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0)
# print(f"---- Derivatives stats {timeframe}----")
# print(stats)
# print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}")
# print("---------------------------")
# mapping tendency
def tag_by_derivatives(row):
idx = int(row.name)
d1v = float(row[d1_col])
d2v = float(row[d2_col])
eps1 = float(eps_d1_series.iloc[idx])
eps2 = float(eps_d2_series.iloc[idx])
# # mapping état → codes 3 lettres explicites
# # | Ancien état | Nouveau code 3 lettres | Interprétation |
# # | ----------- | ---------------------- | --------------------- |
# # | 4 | HAU | Hausse Accélérée |
# # | 3 | HSR | Hausse Ralentissement |
# # | 2 | HST | Hausse Stable |
# # | 1 | DHB | Départ Hausse |
# # | 0 | PAL | Palier / neutre |
# # | -1 | DBD | Départ Baisse |
# # | -2 | BSR | Baisse Ralentissement |
# # | -3 | BST | Baisse Stable |
# # | -4 | BAS | Baisse Accélérée |
# Palier strict
if abs(d1v) <= eps1 and abs(d2v) <= eps2:
return 0
# Départ si d1 ~ 0 mais d2 signale direction
if abs(d1v) <= eps1:
return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0
# Hausse
if d1v > eps1:
return 4 if d2v > eps2 else 3
# Baisse
if d1v < -eps1:
return -4 if d2v < -eps2 else -2
return 0
dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1)
# if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'):
# print("##################")
# print(f"# STAT {timeframe} {name}{suffixe}")
# print("##################")
# self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2")
return dataframe
def calculateConfiance(self, informative):
df = informative.copy()
# ATR normalisé
df['atr_norm'] = talib.ATR(df['high'], df['low'], df['close'], length=14) / df['close']
# SMA200 & pente
df['sma200'] = talib.SMA(df['close'], 200)
df['sma200_slope'] = df['sma200'].diff()
# drawdown
df['rolling_ath'] = df['close'].cummax()
df['drawdown'] = (df['close'] - df['rolling_ath']) / df['rolling_ath']
# volume spike
df['vol_spike'] = df['volume'] / df['volume'].rolling(20).mean()
# RSI courts/longs
df['rsi14'] = talib.RSI(df['close'], 14)
df['rsi60'] = talib.RSI(df['close'], 60)
# Scores normalisés
df['vol_score'] = 1 - np.clip(df['atr_norm'] / 0.05, 0, 1)
df['trend_score'] = 1 / (1 + np.exp(-df['sma200_slope'] * 150))
df['dd_score'] = 1 - np.clip(abs(df['drawdown']) / 0.3, 0, 1)
df['volpanic_score'] = 1 - np.clip(df['vol_spike'] / 3, 0, 1)
df['rsi_score'] = 1 / (1 + np.exp(-(df['rsi14'] - df['rsi60']) / 10))
# Indice final
informative['confidence_index'] = (
0.25 * df['vol_score'] +
0.25 * df['trend_score'] +
0.20 * df['dd_score'] +
0.15 * df['volpanic_score'] +
0.15 * df['rsi_score']
)
return informative
def calculModelInformative(self, informative):
# préparation
# print(df)
df = informative.copy()
X = df[self.listUsableColumns(df)]
df['target'] = ((df["sma24"].shift(-13) - df["sma24"]) > 0).astype(int)
df['target'] = df['target'].fillna(0).astype(int)
y = df['target']
# train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=False, test_size=0.2)
# Pipeline normalisé + Logistic Regresson
clf = Pipeline([
("scaler", StandardScaler()),
("logreg", LogisticRegression(max_iter=5000))
])
# Calibration CV automatique
cal = CalibratedClassifierCV(clf, cv=3, method="isotonic")
# Entraînement
cal.fit(X_train, y_train)
# Probabilités calibrées
probas = cal.predict_proba(X_test)[:, 1]
# Injection propre des probabilités dans le dataframe original (aux bons index)
df.loc[X_test.index, 'ml_prob'] = probas
print("Brier score:", brier_score_loss(y_test, probas))
print("ROC AUC:", roc_auc_score(y_test, probas))
# joindre probabilités au df (dernières lignes correspondantes)
return probas
def prune_features(self, model, dataframe, feature_columns, importance_threshold=0.01):
"""
Supprime les features dont l'importance est inférieure au seuil.
Args:
model: XGBClassifier déjà entraîné
dataframe: DataFrame contenant toutes les features
feature_columns: liste des colonnes/features utilisées pour la prédiction
importance_threshold: seuil minimal pour conserver une feature (en proportion de l'importance totale)
Returns:
dataframe_pruned: dataframe avec uniquement les features conservées
kept_features: liste des features conservées
"""
booster = model.get_booster()
# Récupérer importance des features selon 'gain'
importance = booster.get_score(importance_type='gain')
# Normaliser pour que la somme soit 1
total_gain = sum(importance.values())
normalized_importance = {k: v / total_gain for k, v in importance.items()}
# Features à garder
kept_features = [f for f in feature_columns if normalized_importance.get(f, 0) >= importance_threshold]
dataframe_pruned = dataframe[kept_features].fillna(0)
# print(f"⚡ Features conservées ({len(kept_features)} / {len(feature_columns)}): {kept_features}")
return dataframe_pruned, kept_features