Files
Freqtrade/Zeus_TensorFlow.py
Jérôme Delacotte a376e37e61 TensorFlow
2025-11-16 21:05:48 +01:00

3172 lines
137 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
import os
import json
from pandas import DataFrame
from typing import Optional, Union, Tuple
import math
import logging
from pathlib import Path
# --------------------------------
# Add your lib to import here test git
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
from datetime import timezone, timedelta
logger = logging.getLogger(__name__)
# Machine Learning
from sklearn.model_selection import train_test_split
import joblib
import matplotlib.pyplot as plt
from sklearn.metrics import (
classification_report,
confusion_matrix,
accuracy_score,
roc_auc_score,
roc_curve,
precision_score, recall_score, precision_recall_curve,
f1_score, mean_squared_error, r2_score
)
from sklearn.tree import export_text
import inspect
from sklearn.feature_selection import SelectFromModel
from tabulate import tabulate
from sklearn.feature_selection import VarianceThreshold
import seaborn as sns
import lightgbm as lgb
from sklearn.model_selection import cross_val_score
import optuna.visualization as vis
import optuna
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, Ridge, HuberRegressor
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
# Tensorflow
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.models import load_model
from keras.utils import plot_model
from keras.models import Sequential
from keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # désactive complètement le GPU
os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices=false"
# Couleurs ANSI de base
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
RESET = "\033[0m"
import warnings
warnings.filterwarnings(
"ignore",
message=r".*No further splits with positive gain.*"
)
def pprint_df(dframe):
print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False))
def normalize(df):
df = (df - df.min()) / (df.max() - df.min())
return df
class Zeus_TensorFlow(IStrategy):
startup_candle_count = 24 * 12
# Machine Learning
model = None
model_indicators = []
indicator_target = 'mid_smooth_5'
# Tensorflow
lookback = 60
future_steps = 12
y_no_scale = False
path = f"user_data/plots/"
# ROI table:
minimal_roi = {
"0": 0.564,
"567": 0.273,
"2814": 0.12,
"7675": 0
}
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = False
trailing_stop = True
trailing_stop_positive = 0.15
trailing_stop_positive_offset = 0.20
trailing_only_offset_is_reached = True
# Buy hypers
timeframe = '5m'
max_open_trades = 5
max_amount = 40
parameters = {}
# DCA config
position_adjustment_enable = True
plot_config = {
"main_plot": {
"sma24_1h": {
"color": "pink"
},
"sma5_1d": {
"color": "blue"
},
# "sma24": {
# "color": "yellow"
# },
"sma60": {
"color": "green"
},
"bb_lowerband": {
"color": "#da59a6"},
"bb_upperband": {
"color": "#da59a6",
},
# "sma12": {
# "color": "blue"
# },
"mid_smooth_3_1h": {
"color": "blue"
}
},
"subplots": {
"Rsi": {
"max_rsi_24": {
"color": "blue"
},
"max_rsi_24_1h": {
"color": "pink"
},
# "rsi_1h": {
# "color": "red"
# },
# "rsi_1d": {
# "color": "blue"
# }
},
"Rsi_deriv1": {
"sma24_deriv1_1h": {
"color": "pink"
},
"sma24_deriv1": {
"color": "yellow"
},
"sma5_deriv1_1d": {
"color": "blue"
},
"sma60_deriv1": {
"color": "green"
}
},
"Rsi_deriv2": {
"sma24_deriv2_1h": {
"color": "pink"
},
"sma24_deriv2": {
"color": "yellow"
},
"sma5_deriv2_1d": {
"color": "blue"
},
"sma60_deriv2": {
"color": "green"
}
},
'Macd': {
"macd_rel_1d": {
"color": "cyan"
},
"macdsignal_rel_1d": {
"color": "pink"
},
"macdhist_rel_1d": {
"color": "yellow"
}
}
}
}
columns_logged = False
pairs = {
pair: {
"first_buy": 0,
"last_buy": 0.0,
"last_min": 999999999999999.5,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
'previous_profit': 0,
"last_candle": {},
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
'force_buy': False
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
# 20 20 40 60 100 160 260 420
# 50 50 100 300 500
# fibo = [1, 1, 2, 3, 5, 8, 13, 21]
# my fibo
# 50 50 50 100 100 150 200 250 350 450 600 1050
fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21]
baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84]
# Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21
# Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050
# Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200
# baisse 1 2 3 5 7 10 14 19 26 35 47 63 84
# factors = [1, 1.1, 1.25, 1.5, 2.0, 3]
# thresholds = [2, 5, 10, 20, 30, 50]
factors = [0.5, 0.75, 1, 1.25, 1.5, 2]
thresholds = [0, 2, 5, 10, 30, 45]
trades = list()
max_profit_pairs = {}
mise_factor_buy = DecimalParameter(0.01, 0.1, default=0.05, decimals=2, space='buy', optimize=True, load=True)
indicators = {'sma5', 'sma12', 'sma24', 'sma60'}
indicators_percent = {'percent', 'percent3', 'percent12', 'percent24', 'percent_1h', 'percent3_1h', 'percent12_1h', 'percent24_1h'}
mises = IntParameter(1, 50, default=5, space='buy', optimize=True, load=True)
ml_prob_buy = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='buy', optimize=True, load=True)
ml_prob_sell = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='sell', optimize=True, load=True)
pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=True, load=True)
pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True)
rsi_deb_protect = IntParameter(50, 90, default=70, space='protection', optimize=True, load=True)
rsi_end_protect = IntParameter(20, 60, default=55, space='protection', optimize=True, load=True)
sma24_deriv1_deb_protect = DecimalParameter(-4, 4, default=-2, decimals=1, space='protection', optimize=True, load=True)
sma24_deriv1_end_protect = DecimalParameter(-4, 4, default=0, decimals=1, space='protection', optimize=True, load=True)
# =========================================================================
should_enter_trade_count = 0
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
if allow_to_buy:
self.trades = list()
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
dispo = round(self.wallets.get_available_stake_amount())
self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.pairs[pair]['total_amount'] = stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0))
profit =trade.calc_profit(rate)
force = self.pairs[pair]['force_sell']
allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss')
if allow_to_sell:
self.trades = list()
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['max_profit'] = 0
self.pairs[pair]['previous_profit'] = 0
self.trades = list()
dispo = round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟥Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(profit, 2)
)
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['has_gain'] = 0
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['total_amount'] = 0
self.pairs[pair]['count_of_buys'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['current_trade'] = None
# else:
# self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked")
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
current_candle = dataframe.iloc[-1].squeeze()
adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle)
# print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}")
# Use default stake amount.
return adjusted_stake_amount
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_1h = dataframe.iloc[-13].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
before_last_candle_2 = dataframe.iloc[-3].squeeze()
before_last_candle_12 = dataframe.iloc[-13].squeeze()
expected_profit = self.expectedProfit(pair, last_candle)
# print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
max_touch_before = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
self.pairs[pair]['current_trade'] = trade
count_of_buys = trade.nr_of_successful_entries
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
max_profit = self.pairs[pair]['max_profit']
baisse = 0
if profit > 0:
baisse = 1 - (profit / max_profit)
mx = max_profit / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
if hours % 4 == 0:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying_1h'] else "🟢 CURRENT",
dispo=dispo,
pair=pair,
rate=last_candle['close'],
trade_type='',
profit=round(profit, 2),
buys=count_of_buys,
stake=0
)
pair_name = self.getShortName(pair)
if last_candle['max_rsi_24'] > 85 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0.05:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
return str(count_of_buys) + '_' + 'Rsi85_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
if self.pairs[pair]['force_sell']:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
return str(count_of_buys) + '_' + 'Frc_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
if profit > 0 and baisse > 0.30:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
# if max_profit > 0.5 * count_of_buys and baisse > 0.15:
# self.pairs[pair]['force_sell'] = False
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
# return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
if (last_candle['sma5_1h'] - before_last_candle_12['sma5_1h']) / last_candle['sma5_1h'] > 0.0002:
return None
factor = 1
if (self.getShortName(pair) == 'BTC'):
factor = 0.5
# if baisse > 2 and baisse > factor * self.pairs[pair]['total_amount'] / 100:
# self.pairs[pair]['force_sell'] = False
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
# return 'Baisse_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain'])
#
# if 1 <= count_of_buys <= 3:
if last_candle['max_rsi_24'] > 75 and profit > expected_profit and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0:
self.pairs[pair]['force_sell'] = False
return str(count_of_buys) + '_' + 'Rsi75_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
def getShortName(self, pair):
return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '')
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
# informative_pairs = [(pair, '1d') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
from typing import List
def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float:
if pct <= thresholds[0]:
return factors[0]
if pct >= thresholds[-1]:
return factors[-1]
for i in range(1, len(thresholds)):
if pct <= thresholds[i]:
# interpolation linéaire entre thresholds[i-1] et thresholds[i]
return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / (
thresholds[i] - thresholds[i - 1])
# Juste au cas où (devrait jamais arriver)
return factors[-1]
# def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30,
# start_factor: float = 1.0, end_factor: float = 2.0) -> float:
# if pct <= start_pct:
# return start_factor
# if pct >= end_pct:
# return end_factor
# # interpolation linéaire
# return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct)
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return
if self.columns_logged % 10 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |"
f"{'rsi':>6}|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
)
self.printLineLog()
df = pd.DataFrame.from_dict(self.pairs, orient='index')
colonnes_a_exclure = ['last_candle',
'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
print(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
# int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = self.getLastLost(last_candle, pair)
if buys is None:
buys = ''
max_touch = ''
pct_max = self.getPctFirstBuy(pair, last_candle)
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
dist_max = ''
color = GREEN if profit > 0 else RED
color_sma24 = GREEN if last_candle['sma24_deriv1_1h'] > 0 else RED
color_sma24_2 = GREEN if last_candle['sma24_deriv2_1h'] > 0 else RED
color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1h'] > 0 else RED
color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1h'] > 0 else RED
color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED
color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED
color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED
color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
self.pairs[pair]['last_max'], 3)
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
self.pairs[pair]['last_min'], 3)
profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
self.printLog(
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
# f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_1h_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|"
# f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_1h_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|"
f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|"
f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1_1h'], 2):>5}{RESET}"
f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1_1h'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2_1h'], 2):>5}{RESET}"
f"|{color_sma5_1h}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}"
f"|{color_smooth_1h}{round(last_candle['mid_smooth_1h_deriv1'], 2):>5}{RESET}|{color_smooth2_1h}{round(last_candle['mid_smooth_1h_deriv2'], 2):>5}{RESET}"
)
def getLastLost(self, last_candle, pair):
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
return last_lost
def printLineLog(self):
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|"
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
)
def printLog(self, str):
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return;
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def add_tendency_column(self, dataframe: pd.DataFrame, name: str, suffixe: str = '', eps: float = 1e-3,
d1_lim_inf: float = -0.01, d1_lim_sup: float = 0.01) -> pd.DataFrame:
"""
Ajoute une colonne 'tendency' basée sur les dérivées 1 et 2 lissées et normalisées.
eps permet de définir un seuil proche de zéro.
suffixe permet de gérer plusieurs indicateurs.
"""
def tag_by_derivatives(row):
d1 = row[f"{name}{suffixe}_deriv1"]
d2 = row[f"{name}{suffixe}_deriv2"]
# On considère les petites valeurs comme zéro
if abs(d1) < eps:
return 0 # Palier / neutre
if d1 > d1_lim_sup:
return 2 if d2 > eps else 1 # Acceleration Hausse / Ralentissement Hausse
if d1 < d1_lim_inf:
return -2 if d2 < -eps else -1 # Acceleration Baisse / Ralentissement Baisse
if abs(d1) < eps:
return 'DH' if d2 > eps else 'DB' # Depart Hausse / Depart Baisse
return 'Mid'
print(f"{name}_tdc{suffixe}")
dataframe[f"{name}_tdc{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1)
return dataframe
# def add_tendency_column(self, dataframe: pd.DataFrame, name, suffixe='') -> pd.DataFrame:
# def tag_by_derivatives(row):
# d1 = row[f"{name}{suffixe}_deriv1"]
# d2 = row[f"{name}{suffixe}_deriv2"]
# d1_lim_inf = -0.01
# d1_lim_sup = 0.01
# if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup:
# return 0 # Palier
# if d1 == 0.0:
# return 'DH' if d2 > 0 else 'DB' # Depart Hausse / Départ Baisse
# if d1 > d1_lim_sup:
# return 2 if d2 > 0 else 1 # Acceleration Hausse / Ralentissement Hausse
# if d1 < d1_lim_inf:
# return -2 if d2 < 0 else -1 # Accéleration Baisse / Ralentissement Baisse
# return 'Mid'
#
# dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1)
# return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
short_pair = self.getShortName(pair)
self.path = f"user_data/plots/{short_pair}/"
dataframe = self.populateDataframe(dataframe, timeframe='5m')
################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
informative = self.populateDataframe(informative, timeframe='1h')
informative = self.calculateRegression(informative, 'mid', lookback=5)
# # TENSOR FLOW
# self.model_indicators = self.listUsableColumns(informative)
# if self.dp.runmode.value in ('backtest'):
# self.trainTensorFlow(informative, future_steps = self.future_steps)
#
# self.predictTensorFlow(informative)
#
# if self.dp.runmode.value in ('backtest'):
# self.kerasGenerateGraphs(informative)
informative['stop_buying_deb'] = ((informative['max_rsi_24'] > self.rsi_deb_protect.value)
& (informative['sma24_deriv1'] < self.sma24_deriv1_deb_protect.value)
)
informative['stop_buying_end'] = ((informative['max_rsi_24'] < self.rsi_end_protect.value)
& (informative['sma24_deriv1'] > self.sma24_deriv1_end_protect.value)
)
latched = np.zeros(len(informative), dtype=bool)
for i in range(1, len(informative)):
if informative['stop_buying_deb'].iloc[i]:
latched[i] = True
elif informative['stop_buying_end'].iloc[i]:
latched[i] = False
else:
latched[i] = latched[i - 1]
informative['stop_buying'] = latched
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
# ################### INFORMATIVE 1d
# informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
# informative = self.populateDataframe(informative, timeframe='1d')
# # informative = self.calculateRegression(informative, 'mid', lookback=15)
# dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
filled_buys = trade.select_filled_orders('buy')
count = 0
amount = 0
for buy in filled_buys:
if count == 0:
dataframe['first_price'] = buy.price
self.pairs[pair]['first_buy'] = buy.price
self.pairs[pair]['first_amount'] = buy.price * buy.filled
# dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
self.pairs[pair]['last_buy'] = buy.price
count = count + 1
amount += buy.price * buy.filled
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
count_buys = count
# dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100)
self.pairs[pair]['total_amount'] = amount
# dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24'])
# ===============================
# lissage des valeurs horaires
dataframe['mid_smooth_1h'] = dataframe['mid'].rolling(window=6).mean()
dataframe["mid_smooth_1h_deriv1"] = 100 * dataframe["mid_smooth_1h"].diff().rolling(window=6).mean() / \
dataframe['mid_smooth_1h']
dataframe["mid_smooth_1h_deriv2"] = 100 * dataframe["mid_smooth_1h_deriv1"].diff().rolling(window=6).mean()
dataframe['mid_smooth_5h'] = talib.EMA(dataframe, timeperiod=60) # dataframe['mid'].rolling(window=60).mean()
dataframe["mid_smooth_5h_deriv1"] = 100 * dataframe["mid_smooth_5h"].diff().rolling(window=60).mean() / \
dataframe['mid_smooth_5h']
dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean()
dataframe = self.calculateRegression(dataframe, 'mid', lookback=10, future_steps=10, model_type="poly")
dataframe = self.calculateRegression(dataframe, 'sma24', lookback=12, future_steps=12)
# dataframe["ms-10"] = dataframe[self.indicator_target].shift(10)
# dataframe["ms-5"] = dataframe[self.indicator_target].shift(5)
# dataframe["ms-4"] = dataframe[self.indicator_target].shift(4)
# dataframe["ms-3"] = dataframe[self.indicator_target].shift(3)
# dataframe["ms-2"] = dataframe[self.indicator_target].shift(2)
# dataframe["ms-1"] = dataframe[self.indicator_target].shift(1)
# dataframe["ms-0"] = dataframe[self.indicator_target]
# dataframe["ms+10"] = dataframe["mid_smooth_24"].shift(-11)
self.model_indicators = self.listUsableColumns(dataframe)
# # Quantile
# self.add_future_quantiles(
# dataframe,
# indic="mid",
# lookback=40,
# future_steps=5
# )
# TENSOR FLOW
if self.dp.runmode.value in ('backtest'):
self.trainTensorFlow(dataframe, future_steps = self.future_steps)
self.predictTensorFlow(dataframe)
if self.dp.runmode.value in ('backtest'):
self.kerasGenerateGraphs(dataframe)
# SKLEARN
# if self.dp.runmode.value in ('backtest'):
# self.trainModel(dataframe, metadata)
# short_pair = self.getShortName(pair)
# self.model = joblib.load(f"{short_pair}_rf_model.pkl")
#
# # Préparer les features pour la prédiction
# features = dataframe[self.model_indicators].fillna(0)
#
# # Prédiction : probabilité que le prix monte
# # probs = self.model.predict_proba(features)[:, 1]
# probs = self.model.predict(features)
#
# # Sauvegarder la probabilité pour lanalyse
# dataframe['ml_prob'] = probs
#
# self.inspect_model(self.model)
return dataframe
def trainModel(self, dataframe: DataFrame, metadata: dict):
pair = self.getShortName(metadata['pair'])
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option("display.width", 200)
os.makedirs(self.path, exist_ok=True)
df = dataframe[self.model_indicators].copy()
# Corrélations des colonnes
corr = df.corr(numeric_only=True)
print("Corrélation des colonnes")
print(corr)
# 3⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies
# df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int)
df['target'] = dataframe[self.indicator_target].shift(-24) # > df['sma24'] * 1.003).astype(int)
df['target'] = df['target'].fillna(0) #.astype(int)
# Corrélations triées par importance avec une colonne cible
target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False)
print("Corrélations triées par importance avec une colonne cible")
print(target_corr)
# Corrélations triées par importance avec une colonne cible
corr = df.corr(numeric_only=True)
corr_unstacked = (
corr.unstack()
.reset_index()
.rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"})
)
# Supprimer les doublons col1/col2 inversés et soi-même
corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]]
# Trier par valeur absolue de corrélation
corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index)
print("Trier par valeur absolue de corrélation")
print(corr_sorted.head(20))
# --- Calcul de la corrélation ---
corr = df.corr(numeric_only=True) # évite les colonnes non numériques
corr = corr * 100 # passage en pourcentage
# --- Masque pour nafficher que le triangle supérieur (optionnel) ---
mask = np.triu(np.ones_like(corr, dtype=bool))
# --- Création de la figure ---
fig, ax = plt.subplots(figsize=(96, 36))
# --- Heatmap avec un effet “température” ---
sns.heatmap(
corr,
mask=mask,
cmap="coolwarm", # palette bleu → rouge
center=0, # 0 au centre
annot=True, # affiche les valeurs dans chaque case
fmt=".0f", # format entier (pas de décimale)
cbar_kws={"label": "Corrélation (%)"}, # légende à droite
linewidths=0.5, # petites lignes entre les cases
ax=ax
)
# --- Personnalisation ---
ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20)
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
# --- Sauvegarde ---
output_path = f"{self.path}/Matrice_de_correlation_temperature.png"
plt.savefig(output_path, bbox_inches="tight", dpi=150)
plt.close(fig)
print(f"✅ Matrice enregistrée : {output_path}")
# Nettoyage
df = df.dropna()
X = df[self.model_indicators]
y = df['target'] # ta colonne cible binaire ou numérique
print(self.feature_auc_scores(X, y))
# 4⃣ Split train/test
X = df[self.model_indicators]
y = df['target']
# Séparation temporelle (train = 80 %, valid = 20 %)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
print("NaN per column:")
print(X_train.isna().sum().sort_values(ascending=False).head(20))
# Nettoyage des valeurs invalides
selector = VarianceThreshold(threshold=0.0001)
selector.fit(X_train)
selected = X_train.columns[selector.get_support()]
print("Colonnes conservées :", list(selected))
# 1⃣ Entraîne ton modèle LGBM normal
# train_model = LGBMRegressor(
# objective='regression',
# metric='rmse', # tu peux aussi tester 'mae'
# n_estimators=300,
# learning_rate=0.05,
# max_depth=7,
# subsample=0.8,
# colsample_bytree=0.8,
# random_state=42
# )
# train_model.fit(X_train, y_train)
train_model, selected_features = self.optuna(self.path, X_train, X_test, y_train, y_test)
print("Features retenues :", list(selected_features))
# # 2⃣ Sélection des features AVANT calibration
# sfm = SelectFromModel(train_model, threshold="median", prefit=True)
# selected_features = X_train.columns[sfm.get_support()]
# print(selected_features)
train_model.fit(X_train, y_train)
# Importances
importances = pd.DataFrame({
"feature": train_model.feature_name_,
"importance": train_model.feature_importances_
}).sort_values("importance", ascending=False)
print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
print(importances)
# 6⃣ Évaluer la précision (facultatif)
preds = train_model.predict(X_test)
mse = mean_squared_error(y_test, preds)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, preds)
print(f"RMSE: {rmse:.5f} | R²: {r2:.3f}")
# acc = accuracy_score(y_test, preds)
# print(f"Accuracy: {acc:.3f}")
# 7⃣ Sauvegarde du modèle
joblib.dump(train_model, f"{pair}_rf_model.pkl")
print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl")
# # Quantile
# dataframe = self.add_future_quantiles(
# df,
# indic="mid",
# lookback=40,
# future_steps=5
# )
self.analyze_model(pair, train_model, X_train, X_test, y_train, y_test)
def listUsableColumns(self, dataframe):
# Étape 1 : sélectionner numériques
numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
# Étape 2 : enlever constantes
usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
and not c.endswith("_state") and not c.endswith("_1d")
# and not c.endswith("_1h")
and not c.endswith("_count")
# and not c.startswith("open") and not c.startswith("close")
# and not c.startswith("low") and not c.startswith("high")
# and not c.startswith("haopen") and not c.startswith("haclose")
# and not c.startswith("bb_lower") and not c.startswith("bb_upper")
# and not c.startswith("bb_middle")
and not c.endswith("_class") and not c.endswith("_price")
and not c.startswith('stop_buying')]
# Étape 3 : remplacer inf et NaN par 0
dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
print("Colonnes utilisables pour le modèle :")
print(usable_cols)
self.model_indicators = usable_cols
# self.model_indicators = [
# 'volume', 'hapercent', 'mid', 'percent', 'percent3', 'percent12',
# 'percent24',
# 'sma5', 'sma5_dist', 'sma5_deriv1', 'sma5_deriv2', 'sma12', 'sma12_dist',
# 'sma12_deriv1', 'sma12_deriv2', 'sma24', 'sma24_dist', 'sma24_deriv1', 'sma24_deriv2',
# # 'sma48', 'sma48_dist', 'sma48_deriv1', 'sma48_deriv2', 'sma60', 'sma60_dist',
# # 'sma60_deriv1', 'sma60_deriv2', 'mid_smooth_3', 'mid_smooth_3_dist',
# # 'mid_smooth_3_deriv1', 'mid_smooth_3_deriv2', 'mid_smooth_5', 'mid_smooth_5_dist',
# # 'mid_smooth_5_deriv1', 'mid_smooth_5_deriv2', 'mid_smooth_12', 'mid_smooth_12_dist',
# # 'mid_smooth_12_deriv1', 'mid_smooth_12_deriv2', 'mid_smooth_24', 'mid_smooth_24_dist',
# # 'mid_smooth_24_deriv1', 'mid_smooth_24_deriv2', 'rsi', 'max_rsi_12', 'max_rsi_24',
# 'rsi_dist', 'rsi_deriv1', 'rsi_deriv2', 'max12', 'min12', 'max60', 'min60',
# 'min_max_60', 'bb_percent', 'bb_width', 'macd', 'macdsignal', 'macdhist', 'slope',
# 'slope_smooth', 'atr', 'atr_norm', 'adx', 'obv', 'vol_24',
# # 'down_count', 'up_count',
# # 'down_pct', 'up_pct', 'rsi_slope', 'adx_change', 'volatility_ratio', 'rsi_diff',
# # 'slope_ratio', 'volume_sma_deriv', 'volume_dist', 'volume_deriv1', 'volume_deriv2',
# # 'slope_norm', 'mid_smooth_1h_deriv1', 'mid_smooth_1h_deriv2', 'mid_smooth_5h',
# # 'mid_smooth_5h_deriv1', 'mid_smooth_5h_deriv2', 'mid_future_pred_cons',
# # 'sma24_future_pred_cons'
# ]
return self.model_indicators
def inspect_model(self, model):
"""
Affiche les informations d'un modèle ML déjà entraîné.
Compatible avec scikit-learn, xgboost, lightgbm, catboost...
"""
print("===== 🔍 INFORMATIONS DU MODÈLE =====")
# Type de modèle
print(f"Type : {type(model).__name__}")
print(f"Module : {model.__class__.__module__}")
# Hyperparamètres
if hasattr(model, "get_params"):
params = model.get_params()
print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====")
for k, v in params.items():
print(f"{k}: {v}")
# Nombre destimateurs
if hasattr(model, "n_estimators"):
print(f"\nNombre destimateurs : {model.n_estimators}")
# Importance des features
if hasattr(model, "feature_importances_"):
print("\n===== 📊 IMPORTANCE DES FEATURES =====")
# Correction ici :
feature_names = getattr(model, "feature_names_in_", None)
if isinstance(feature_names, np.ndarray):
feature_names = feature_names.tolist()
elif feature_names is None:
feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))]
fi = pd.DataFrame({
"feature": feature_names,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(fi)
# Coefficients (modèles linéaires)
if hasattr(model, "coef_"):
print("\n===== ➗ COEFFICIENTS =====")
coef = np.array(model.coef_)
if coef.ndim == 1:
for i, c in enumerate(coef):
print(f"Feature {i}: {c:.6f}")
else:
print(coef)
# Intercept
if hasattr(model, "intercept_"):
print("\nIntercept :", model.intercept_)
# Classes connues
if hasattr(model, "classes_"):
print("\n===== 🎯 CLASSES =====")
print(model.classes_)
# Scores internes
for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]:
if hasattr(model, attr):
print(f"\n{attr} = {getattr(model, attr)}")
# Méthodes disponibles
print("\n===== 🧩 MÉTHODES DISPONIBLES =====")
methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)]
print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else ""))
print("\n===== ✅ FIN DE LINSPECTION =====")
def analyze_model(self, pair, model, X_train, X_test, y_train, y_test):
"""
Analyse complète d'un modèle ML supervisé (classification binaire).
Affiche performances, importance des features, matrices, seuils, etc.
"""
output_dir = f"user_data/plots/{pair}/"
os.makedirs(output_dir, exist_ok=True)
# ---- Importance des features ----
if hasattr(model, "feature_importances_"):
print("\n===== 🔍 IMPORTANCE DES FEATURES =====")
importance = pd.DataFrame({
"feature": X_train.columns,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
print(importance)
top_n = 20
importance = importance.head(top_n)
# Crée une figure plus grande
fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces
# Trace le bar plot sur cet axe
importance.plot.bar(x="feature", y="importance", legend=False, ax=ax)
# Tourner les labels pour plus de lisibilité
ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right')
plt.title("Importance des features")
# plt.show()
plt.savefig(os.path.join(output_dir, "Importance des features.png"), bbox_inches="tight")
plt.close()
# ---- Arbre de décision (extrait) ----
if hasattr(model, "estimators_"):
print("\n===== 🌳 EXTRAIT DUN ARBRE =====")
print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800])
# --- Après l'entraînement du modèle ---
preds = model.predict(X_test)
# --- Évaluation ---
mse = mean_squared_error(y_test, preds)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, preds)
print(f"RMSE: {rmse:.5f} | R²: {r2:.3f}")
# --- Création du dossier de sortie ---
os.makedirs(output_dir, exist_ok=True)
# --- Graphique prédiction vs réel ---
plt.figure(figsize=(8, 8))
plt.scatter(y_test, preds, alpha=0.4, s=15)
plt.xlabel("Valeurs réelles", fontsize=12)
plt.ylabel("Valeurs prédites", fontsize=12)
plt.title(f"LightGBM Régression — Prédiction vs Réel\nRMSE={rmse:.5f} | R²={r2:.3f}", fontsize=14)
plt.plot(
[y_test.min(), y_test.max()],
[y_test.min(), y_test.max()],
'r--',
linewidth=1,
label="Ligne idéale"
)
plt.legend()
# --- Sauvegarde ---
plot_path = os.path.join(output_dir, "LightGBM_regression_pred_vs_real.png")
plt.savefig(plot_path, bbox_inches="tight", dpi=200)
plt.close()
self.plot_pred_vs_real_filtered(model, X_test, y_test, preds, output_dir)
print(f"✅ Graphique sauvegardé : {plot_path}")
# ax = lgb.plot_tree(model, tree_index=0, figsize=(30, 20), show_info=["split_gain", "internal_value", "internal_count"])
# plt.title("Arbre de décision n°0")
# plt.savefig(os.path.join(output_dir, "lgbm_tree_0.png"), bbox_inches="tight")
# plt.close()
for i in range(5):
ax = lgb.plot_tree(model, tree_index=i, figsize=(20, 12))
plt.title(f"Arbre {i}")
plt.savefig(os.path.join(output_dir, f"lgbm_tree_{i}.png"), bbox_inches="tight")
plt.close()
ax = lgb.plot_tree(model, figsize=(40, 20))
plt.title("Vue globale du modèle LGBM")
plt.savefig(os.path.join(output_dir, "lgbm_all_trees.png"), bbox_inches="tight")
plt.close()
# X_test = np.linspace(0, 10, 1000).reshape(-1, 1)
y_pred = model.predict(X_test)
self.graphFonctionApprise(output_dir, X_test, y_test, y_pred)
self.graphFonctionAppriseFeature(output_dir, X_test, y_test, y_pred)
# ==============================================================================
ax = lgb.plot_importance(model, max_num_features=30, figsize=(12, 6))
plt.title("Importance des features - LGBM")
plt.savefig(os.path.join(output_dir, "lgbm_feature_importance.png"), bbox_inches="tight")
plt.close()
corr = X_train.corr() * 100 # en pourcentage
plt.figure(figsize=(20, 16))
sns.heatmap(corr, cmap="coolwarm", center=0, annot=False, fmt=".1f", cbar_kws={'label': 'Corrélation (%)'})
plt.title("Matrice de corrélation (%)")
plt.savefig(os.path.join(output_dir, "correlation_matrix.png"), bbox_inches="tight")
plt.close()
plt.figure(figsize=(10, 6))
plt.scatter(y_test, model.predict(X_test), alpha=0.5)
plt.xlabel("Valeurs réelles")
plt.ylabel("Prédictions du modèle")
plt.title("Comparaison y_test vs y_pred")
plt.savefig(os.path.join(output_dir, "ytest_vs_ypred.png"), bbox_inches="tight")
plt.close()
print("\n===== ✅ FIN DE LANALYSE =====")
def plot_pred_vs_real_filtered(self, model, X_test, y_test, preds, output_dir, top_n=5):
"""
Affiche le graphique prédiction vs réel pour les N features les plus importantes.
"""
# --- 1⃣ Extraire les features les plus importantes ---
importance_df = pd.DataFrame({
"feature": X_test.columns,
"importance": model.feature_importances_
}).sort_values(by="importance", ascending=False)
top_features = importance_df.head(top_n)["feature"].tolist()
print(f"Top {top_n} features: {top_features}")
# --- 2⃣ Créer un masque pour ne garder que les lignes où au moins une des top features varie fortement ---
X_top = X_test[top_features]
# Optionnel : filtrer les points atypiques pour lisser le nuage
mask = np.all(np.abs((X_top - X_top.mean()) / X_top.std()) < 3, axis=1)
X_filtered = X_top[mask]
y_filtered = y_test[mask]
preds_filtered = preds[mask]
# --- 3⃣ Tracer ---
plt.figure(figsize=(8, 8))
plt.scatter(y_filtered, preds_filtered, alpha=0.4, s=15, c='blue', label=f"Top {top_n} features")
plt.xlabel("Valeurs réelles", fontsize=12)
plt.ylabel("Valeurs prédites", fontsize=12)
plt.title(f"LightGBM Régression — Prédiction vs Réel (filtré sur top {top_n} features)", fontsize=14)
plt.plot(
[y_filtered.min(), y_filtered.max()],
[y_filtered.min(), y_filtered.max()],
'r--',
linewidth=1,
label="Ligne idéale"
)
plt.legend()
plt.grid(True)
out_path = f"{output_dir}/lgbm_pred_vs_real_top{top_n}.png"
plt.savefig(out_path, bbox_inches="tight")
plt.close()
def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None):
"""
Affiche la précision, le rappel et le F1-score selon le seuil de décision.
y_true : labels réels (0 ou 1)
y_proba : probabilités prédites (P(hausse))
step : pas entre les seuils testés
save_path : si renseigné, enregistre l'image au lieu d'afficher
"""
# Le graphique généré affichera trois courbes :
#
# 🔵 Precision — la fiabilité de tes signaux haussiers.
# 🟢 Recall — la proportion de hausses que ton modèle détecte.
# 🟣 F1-score — le compromis optimal entre les deux.
thresholds = np.arange(0, 1.01, step)
precisions, recalls, f1s = [], [], []
for thr in thresholds:
preds = (y_proba >= thr).astype(int)
precisions.append(precision_score(y_true, preds))
recalls.append(recall_score(y_true, preds))
f1s.append(f1_score(y_true, preds))
plt.figure(figsize=(10, 6))
plt.plot(thresholds, precisions, label="Precision", linewidth=2)
plt.plot(thresholds, recalls, label="Recall", linewidth=2)
plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--")
plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5")
plt.title("📊 Performance selon le seuil de probabilité", fontsize=14)
plt.xlabel("Seuil de décision (threshold)")
plt.ylabel("Score")
plt.legend()
plt.grid(True, alpha=0.3)
if save_path:
plt.savefig(save_path, bbox_inches='tight')
print(f"✅ Graphique enregistré : {save_path}")
else:
plt.show()
# # =============================
# # Exemple dutilisation :
# # =============================
# if __name__ == "__main__":
# # Exemple : chargement dun modèle et test
# import joblib
#
# model = joblib.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/model.pkl")
# data = np.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/test_data.npz")
# X_test, y_test = data["X"], data["y"]
#
# y_proba = model.predict_proba(X_test)[:, 1]
#
# # Trace ou enregistre le graphique
# plot_threshold_analysis(y_test, y_proba, step=0.05,
# save_path="/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/threshold_analysis.png")
def populateDataframe(self, dataframe, timeframe='5m'):
dataframe = dataframe.copy()
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
dataframe["percent"] = dataframe['close'].pct_change()
dataframe["percent3"] = dataframe['close'].pct_change(3).rolling(3).mean()
dataframe["percent12"] = dataframe['close'].pct_change(12).rolling(12).mean()
dataframe["percent24"] = dataframe['close'].pct_change(24).rolling(24).mean()
# if self.dp.runmode.value in ('backtest'):
# dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close']
dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean()
self.calculeDerivees(dataframe, 'sma5', timeframe=timeframe, ema_period=5)
dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() #dataframe["mid"].rolling(window=12).mean()
self.calculeDerivees(dataframe, 'sma12', timeframe=timeframe, ema_period=12)
dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() #dataframe["mid"].rolling(window=24).mean()
self.calculeDerivees(dataframe, 'sma24', timeframe=timeframe, ema_period=24)
dataframe['sma48'] = dataframe['mid'].ewm(span=48, adjust=False).mean() #dataframe["mid"].rolling(window=48).mean()
self.calculeDerivees(dataframe, 'sma48', timeframe=timeframe, ema_period=48)
dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() #dataframe["mid"].rolling(window=60).mean()
self.calculeDerivees(dataframe, 'sma60', timeframe=timeframe, ema_period=60)
dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", timeframe=timeframe)
# print(metadata['pair'])
dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
self.calculeDerivees(dataframe, 'rsi', timeframe=timeframe, ema_period=12)
dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12)
dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60)
dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60)
dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['close']) / dataframe['min60'])
# dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36)
# dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36)
# dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36']
# dataframe['maxpct36'] = talib.MAX(dataframe['pct36'], timeperiod=36)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma5"]
# dataframe["bb_width"] = (
# (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
# )
# Calcul MACD
macd, macdsignal, macdhist = talib.MACD(
dataframe['close'],
fastperiod=12,
slowperiod=26,
signalperiod=9
)
# | Nom | Formule / définition | Signification |
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre lécart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal dachat <br> - Croisement du MACD en dessous → signal de vente |
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et laccélération** de la tendance. <br> - Positif et croissant → tendance haussière qui saccélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui saccélère <br> - Négatif mais croissant → ralentissement de la baisse |
# Ajouter dans le dataframe
dataframe['macd'] = macd
dataframe['macdsignal'] = macdsignal
dataframe['macdhist'] = macdhist
# Regarde dans le futur
# # --- Rendre relatif sur chaque série (-1 → 1) ---
# for col in ['macd', 'macdsignal', 'macdhist']:
# series = dataframe[col]
# valid = series[~np.isnan(series)] # ignorer NaN
# min_val = valid.min()
# max_val = valid.max()
# span = max_val - min_val if max_val != min_val else 1
# dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1
#
# dataframe['tdc_macd'] = self.macd_tendance_int(
# dataframe,
# macd_col='macd_rel',
# signal_col='macdsignal_rel',
# hist_col='macdhist_rel'
# )
# --- pente brute ---
dataframe['slope'] = dataframe['sma24'].diff()
# --- lissage EMA ---
dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean()
# --- Volatilité normalisée ---
dataframe['atr'] = ta.volatility.AverageTrueRange(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).average_true_range()
dataframe['atr_norm'] = dataframe['atr'] / dataframe['close']
# --- Force de tendance ---
dataframe['adx'] = ta.trend.ADXIndicator(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).adx()
# --- Volume directionnel (On Balance Volume) ---
dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(
close=dataframe['close'], volume=dataframe['volume']
).on_balance_volume()
# --- Volatilité récente (écart-type des rendements) ---
dataframe['vol_24'] = dataframe['percent'].rolling(24).std()
# Compter les baisses / hausses consécutives
self.calculateDownAndUp(dataframe, limit=0.0001)
# df : ton dataframe OHLCV + indicateurs existants
# Assurez-vous que les colonnes suivantes existent :
# 'max_rsi_12', 'roc_24', 'bb_percent_1h'
# --- Filtrage des NaN initiaux ---
# dataframe = dataframe.dropna()
dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI
dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance
dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width']
dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3)
dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9)
dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0
###########################
dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean())
self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12)
self.setTrends(dataframe)
return dataframe
def feature_auc_scores(self, X, y):
aucs = {}
for col in X.columns:
try:
aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0))
except Exception:
aucs[col] = np.nan
return pd.Series(aucs).sort_values(ascending=False)
def macd_tendance_int(self, dataframe: pd.DataFrame,
macd_col='macd',
signal_col='macdsignal',
hist_col='macdhist',
eps=0.0) -> pd.Series:
"""
Renvoie la tendance MACD sous forme d'entiers.
2 : Haussier
1 : Ralentissement hausse
0 : Neutre
-1 : Ralentissement baisse
-2 : Baissier
"""
# | Nom | Formule / définition | Signification |
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre lécart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal dachat <br> - Croisement du MACD en dessous → signal de vente |
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et laccélération** de la tendance. <br> - Positif et croissant → tendance haussière qui saccélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui saccélère <br> - Négatif mais croissant → ralentissement de la baisse |
# | Situation | MACD | Signal | Hist | Interprétation |
# | -------------------------- | ---------- | --------- | -------- | ------------------------------------------ |
# | MACD > 0, Hist croissant | au-dessus | croissant | Haussier | Momentum fort → tendance haussière |
# | MACD > 0, Hist décroissant | au-dessus | en baisse | Momentum | La hausse ralentit, prudence |
# | MACD < 0, Hist décroissant | en dessous | en baisse | Baissier | Momentum fort → tendance baissière |
# | MACD < 0, Hist croissant | en dessous | en hausse | Rebond ? | La baisse ralentit → possible retournement |
# Créer une série de 0 par défaut
tendance = pd.Series(0, index=dataframe.index)
# Cas MACD > signal
mask_up = dataframe[macd_col] > dataframe[signal_col] + eps
mask_up_hist_pos = mask_up & (dataframe[hist_col] > 0)
mask_up_hist_neg = mask_up & (dataframe[hist_col] <= 0)
tendance[mask_up_hist_pos] = 2 # Haussier
tendance[mask_up_hist_neg] = 1 # Ralentissement hausse
# Cas MACD < signal
mask_down = dataframe[macd_col] < dataframe[signal_col] - eps
mask_down_hist_neg = mask_down & (dataframe[hist_col] < 0)
mask_down_hist_pos = mask_down & (dataframe[hist_col] >= 0)
tendance[mask_down_hist_neg] = -2 # Baissier
tendance[mask_down_hist_pos] = -1 # Ralentissement baisse
# Les NaN deviennent neutre
tendance[dataframe[[macd_col, signal_col, hist_col]].isna().any(axis=1)] = 0
return tendance
def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['hapercent'] <= limit
dataframe['up'] = dataframe['hapercent'] >= limit
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'):
dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean()
dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", timeframe=timeframe, ema_period=window)
return dataframe
def calculeDerivees(
self,
dataframe: pd.DataFrame,
name: str,
suffixe: str = '',
window: int = 100,
coef: float = 0.15,
ema_period: int = 10,
verbose: bool = True,
timeframe: str = '5m'
) -> pd.DataFrame:
"""
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
avec epsilon adaptatif basé sur rolling percentiles.
"""
d1_col = f"{name}{suffixe}_deriv1"
d2_col = f"{name}{suffixe}_deriv2"
factor1 = 100 * (ema_period / 5)
factor2 = 10 * (ema_period / 5)
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
# --- Distance à la moyenne mobile ---
dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"]
# dérivée relative simple
dataframe[d1_col] = 1000 * (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1)
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
self.trades = Trade.get_open_trades()
return self.trades
def calculateProbabilite2Index(self, df, futur_cols, indic_1, indic_2):
# # Définition des tranches pour les dérivées
# bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf]
# labels = ['forte baisse', 'légère baisse', 'neutre', 'légère hausse', 'forte hausse']
#
# # Ajout des colonnes bin (catégorisation)
# df[f"{indic_1}_bin"] = pd.cut(df['mid_smooth_1h_deriv1'], bins=bins_deriv, labels=labels)
# df[f"{indic_2}_bin"] = pd.cut(df['mid_smooth_deriv1_1d'], bins=bins_deriv, labels=labels)
#
# # Colonnes de prix futur à analyser
# futur_cols = ['futur_percent_1h', 'futur_percent_2h', 'futur_percent_3h', 'futur_percent_4h', 'futur_percent_5h']
#
# # Calcul des moyennes et des effectifs
# grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"])[futur_cols].agg(['mean', 'count'])
#
# pd.set_option('display.width', 200) # largeur max affichage
# pd.set_option('display.max_columns', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 300) # largeur max affichage
# nettoyage
# series = df[f"{indic_2}"].dropna()
# unique_vals = df[f"{indic_2}"].nunique()
# print(unique_vals)
# print(df[f"{indic_2}"])
n = len(self.labels)
df[f"{indic_1}_bin"], bins_1h = pd.qcut(df[f"{indic_1}"], q=n, labels=self.labels, retbins=True,
duplicates='drop')
df[f"{indic_2}_bin"], bins_1d = pd.qcut(df[f"{indic_2}"], q=n, labels=self.labels, retbins=True,
duplicates='drop')
# Affichage formaté pour code Python
print(f"Bornes des quantiles pour {indic_1} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]")
print(f"Bornes des quantiles pour {indic_2} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]")
# Agrégation
grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[futur_cols].agg(['mean', 'count'])
# Affichage
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(grouped.round(4))
# Ajout des probabilités de hausse
for col in futur_cols:
df[f"{col}_is_up"] = df[col] > 0
# Calcul de la proba de hausse
proba_up = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[f"{col}_is_up"].mean().unstack()
print(f"\nProbabilité de hausse pour {col} (en %):")
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print((proba_up * 100).round(1))
# Affichage formaté des valeurs comme tableau Python
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
df_formatted = (proba_up * 100).round(1)
print("data = {")
for index, row in df_formatted.iterrows():
row_values = ", ".join([f"{val:.1f}" for val in row])
print(f"'{index}': [{row_values}], ")
print("}")
data = {}
for index, row in df_formatted.iterrows():
# on convertit proprement avec arrondi comme dans ton print, mais en données réelles
data[index] = [
None if (isinstance(val, float) and math.isnan(val)) else val
for val in row
]
# Niveaux unicode pour les barres verticales (style sparkline)
# spark_chars = "▁▂▃▄▅▆▇█"
# print(data.values())
# # Collecte globale min/max
# all_values = []
# for vals in data.values():
# all_values.extend(v for v in vals if not (isinstance(v, float) and math.isnan(v)))
#
# global_min = min(all_values) if all_values else 0
# global_max = max(all_values) if all_values else 1
# global_span = (global_max - global_min) if global_max != global_min else 1
#
# def sparkline_global(values):
# if all(isinstance(v, float) and math.isnan(v) for v in values):
# return "(no data)"
# out = ""
# for v in values:
# if isinstance(v, float) and math.isnan(v):
# out += " "
# else:
# idx = int((v - global_min) / global_span * (len(spark_chars) - 1))
# out += spark_chars[idx]
# return out
#
# for key, values in data.items():
# print(f"{key:>3} : {sparkline_global(values)}")
# Palette ANSI 256 couleurs pour heatmap
def get_ansi_color(val):
"""
Échelle fixe 0→100 :
0-20 : bleu (21)
20-40 : cyan (51)
40-60 : vert/jaune (46 / 226)
60-80 : orange (208)
80-100 : rouge (196)
"""
if val is None:
return ""
if val < 0:
val = 0
elif val > 100:
val = 100
if val <= 20:
code = 21
elif val <= 40:
code = 51
elif val <= 60:
code = 226
elif val <= 80:
code = 208
else:
code = 196
return f"\033[38;5;{code}m"
RESET = "\033[0m"
# Affichage
columns = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3']
header = " " + " ".join([f"{col:>6}" for col in columns])
print(header)
print("-" * len(header))
for key, values in data.items():
line = f"{key:>3} |"
for v in values:
if v is None:
line += f" {' '} " # vide pour NaN / None
else:
color = get_ansi_color(v)
line += f" {color}{v:5.1f}{RESET} "
print(line)
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[
(
(
(
(dataframe['mid_future_pred_cons'].shift(2) > dataframe['mid_future_pred_cons'].shift(1))
& (dataframe['mid_future_pred_cons'].shift(1) < dataframe['mid_future_pred_cons'])
& (dataframe['percent12'] < -0.0005)
)
| (
(dataframe['mid_future_pred_cons'] < dataframe['min12'])
)
)
&
(
((dataframe['mid_smooth_12_deriv1'] > 0) | (dataframe['mid_smooth_5_deriv1'] > 0))
)
), ['enter_long', 'enter_tag']] = (1, f"future")
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
if self.dp.runmode.value in ('backtest'):
dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather")
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# (
# (
# (dataframe['ml_prob'].shift(2) < dataframe['ml_prob'].shift(1))
# & (dataframe['ml_prob'].shift(1) > dataframe['ml_prob'])
# )
# | (dataframe['ml_prob'] < 0)
# )
# & (dataframe['hapercent'] < 0)
# ), ['exit_long', 'exit_tag']] = (1, f"sma60_future")
# dataframe.loc[
# (
# (
# (
# (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1))
# & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons'])
# )
# # | (dataframe['mid_smooth_12_deriv1'] < 0)
# )
# & (dataframe['sma60_future_pred_cons'] < dataframe['sma60_future_pred_cons'].shift(1))
# & (dataframe['hapercent'] < 0)
# ), ['exit_long', 'exit_tag']] = (1, f"sma60_future")
#
# dataframe.loc[
# (
# (
# (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1))
# & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons'])
#
# )
# # & (dataframe['mid_future_pred_cons'] > dataframe['max12'])
# & (dataframe['hapercent'] < 0)
#
# ), ['exit_long', 'exit_tag']] = (1, f"max12")
return dataframe
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# print("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pair = trade.pair
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
last_lost = self.getLastLost(last_candle, pair)
pct_first = 0
total_counts = sum(
pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC')
if self.pairs[pair]['first_buy']:
pct_first = self.getPctFirstBuy(pair, last_candle)
pct = self.pct.value
if count_of_buys == 1:
pct_max = current_profit
else:
if self.pairs[trade.pair]['last_buy']:
pct_max = self.getPctLastBuy(pair, last_candle)
else:
pct_max = - pct
if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2:
lim = - pct - (count_of_buys * self.pct_inc.value)
else:
pct = 0.05
lim = - pct - (count_of_buys * 0.0025)
if (len(dataframe) < 1):
# print("skip dataframe")
return None
if not self.should_enter_trade(pair, last_candle, current_time):
return None
condition = (last_candle['enter_long'] and last_candle['stop_buying_1h'] == False and last_candle['hapercent'] > 0)
# and last_candle['sma60_deriv1'] > 0
# or last_candle['enter_tag'] == 'pct3' \
# or last_candle['enter_tag'] == 'pct3_1h'
# if (self.getShortName(pair) != 'BTC' and count_of_buys > 3):
# condition = before_last_candle_24['mid_smooth_3_1h'] > before_last_candle_12['mid_smooth_3_1h'] and before_last_candle_12['mid_smooth_3_1h'] < last_candle['mid_smooth_3_1h'] #and last_candle['mid_smooth_3_deriv1_1h'] < -1.5
limit_buy = 40
if (count_of_buys < limit_buy) and condition and (pct_max < lim):
try:
if self.pairs[pair]['has_gain'] and profit > 0:
self.pairs[pair]['force_sell'] = True
return None
max_amount = self.config.get('stake_amount') * 2.5
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle) * abs(last_lost / self.mise_factor_buy.value))
if stake_amount > 0:
trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟧 Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
# colonnes_a_exclure = ['last_candle', 'stop',
# 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# print(df_filtered)
return stake_amount
return None
except Exception as exception:
print(exception)
return None
if (profit > self.pairs[pair]['previous_profit'] and profit > self.pairs[pair]['expected_profit'] and hours > 6
# and last_candle['sma60_deriv1'] > 0
and last_candle['max_rsi_12_1h'] < 75
# and last_candle['rsi_1d'] < 58
# and last_candle['stop_buying'] == False
# and last_candle['mid_smooth_5_deriv1_1d'] > 0
and self.wallets.get_available_stake_amount() > 0
):
try:
self.pairs[pair]['previous_profit'] = profit
stake_amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount'])
if stake_amount > 0:
self.pairs[pair]['has_gain'] += 1
trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟡 Gain +",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=str(round(pct_max, 4)),
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
except Exception as exception:
print(exception)
return None
return None
def getPctFirstBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
def getPctLastBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# Calculer le minimum des 14 derniers jours
nb_pairs = len(self.dp.current_whitelist())
base_stake_amount = self.config.get('stake_amount') / (self.mises.value) # * nb_pairs) # Montant de base configuré
# factors = [1, 1.2, 1.3, 1.4]
if self.pairs[pair]['count_of_buys'] == 0:
factor = 1 #65 / min(65, last_candle['rsi_1d'])
if last_candle['open'] < last_candle['sma5_1h'] and last_candle['mid_smooth_12_deriv1'] > 0:
factor = 2
adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor)
else:
adjusted_stake_amount = self.pairs[pair]['first_amount']
if self.pairs[pair]['count_of_buys'] == 0:
self.pairs[pair]['first_amount'] = adjusted_stake_amount
return adjusted_stake_amount
def expectedProfit(self, pair: str, last_candle: DataFrame):
lim = 0.01
pct = 0.002
if (self.getShortName(pair) == 'BTC'):
lim = 0.005
pct = 0.001
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
self.pairs[pair]['expected_profit'] = expected_profit
return expected_profit
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values
@property
def protections(self):
return [
{
"method": "CooldownPeriod",
"stop_duration_candles": 12
}
# {
# "method": "MaxDrawdown",
# "lookback_period_candles": self.lookback.value,
# "trade_limit": self.trade_limit.value,
# "stop_duration_candles": self.protection_stop.value,
# "max_allowed_drawdown": self.protection_max_allowed_dd.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": self.protection_stoploss_stop.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "only_per_pair": False
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 6,
# "trade_limit": 2,
# "stop_duration_candles": 60,
# "required_profit": 0.02
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "required_profit": 0.01
# }
]
def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15,
max_stake: float = 1000.0) -> float:
"""
Calcule la mise à allouer en fonction du drawdown.
:param pct: Drawdown en pourcentage (ex: -0.12 pour -12%)
:param base_stake: Mise de base (niveau 0)
:param step: Espacement entre paliers (ex: tous les -4%)
:param growth: Facteur de croissance par palier (ex: 1.15 pour +15%)
:param max_stake: Mise maximale à ne pas dépasser
:return: Montant à miser
"""
if pct >= 0:
return base_stake
level = int(abs(pct) / step)
stake = base_stake * (growth ** level)
return min(stake, max_stake)
def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]):
"""
Calcule une régression polynomiale sur les `window` dernières valeurs de la série,
puis prédit les `n_future` prochaines valeurs.
:param series: Série pandas (ex: dataframe['close'])
:param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme
:param degree: Degré du polynôme (ex: 2 pour quadratique)
:param n_future: Nombre de valeurs futures à prédire
:return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures
"""
if len(series) < window:
raise ValueError("La série est trop courte pour la fenêtre spécifiée.")
recent_y = series.iloc[-window:].values
x = np.arange(window)
coeffs = np.polyfit(x, recent_y, degree)
poly = np.poly1d(coeffs)
x_future = np.arange(window, window + len(steps))
y_future = poly(x_future)
# Affichage de la fonction
# print("Fonction polynomiale trouvée :")
# print(poly)
current = series.iloc[-1]
count = 0
for future_step in steps: # range(1, n_future + 1)
future_x = window - 1 + future_step
prediction = poly(future_x)
# series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction
# Afficher les prédictions
# print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}")
if prediction > 0: # current:
count += 1
return poly, x_future, y_future, count
def should_enter_trade(self, pair: str, last_candle, current_time) -> bool:
limit = 3
# if self.pairs[pair]['stop'] and last_candle['max_rsi_12_1h'] <= 60 and last_candle['trend_class_1h'] == -1:
# dispo = round(self.wallets.get_available_stake_amount())
# self.pairs[pair]['stop'] = False
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟢RESTART",
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=0,
# buys=self.pairs[pair]['count_of_buys'],
# stake=0
# )
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
# if not pair.startswith('BTC'):
dispo = round(self.wallets.get_available_stake_amount())
# if self.pairs[pair]['stop'] \
# and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] >= self.indic_deriv1_1d_p_start.value \
# and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] >= self.indic_deriv2_1d_p_start.value:
# self.pairs[pair]['stop'] = False
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟢RESTART",
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=0,
# buys=self.pairs[pair]['count_of_buys'],
# stake=0
# )
# else:
# if self.pairs[pair]['stop'] == False \
# and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] <= self.indic_deriv1_1d_p_stop.value \
# and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] <= self.indic_deriv2_1d_p_stop.value:
# self.pairs[pair]['stop'] = True
# # if self.pairs[pair]['current_profit'] > 0:
# # self.pairs[pair]['force_sell'] = True
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🔴STOP",
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=self.pairs[pair]['current_profit'],
# buys=self.pairs[pair]['count_of_buys'],
# stake=0
# )
# return False
# if self.pairs[pair]['stop']:
# return False
return True
# Filtrer les paires non-BTC
non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')]
# Compter les positions actives sur les paires non-BTC
max_nb_trades = 0
total_non_btc = 0
max_pair = ''
limit_amount = 250
max_amount = 0
for p in non_btc_pairs:
max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys'])
max_amount = max(max_amount, self.pairs[p]['total_amount'])
for p in non_btc_pairs:
if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit):
# if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount):
max_pair = p
total_non_btc += self.pairs[p]['count_of_buys']
pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle)
if last_candle['mid_smooth_1h_deriv1'] < -0.02: # and last_candle['mid_smooth_1h_deriv2'] > 0):
return False
self.should_enter_trade_count = 0
# if max_pair != pair and self.pairs[pair]['total_amount'] > 300:
# return False
if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit):
trade = self.pairs[max_pair]['current_trade']
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
current_time_utc = current_time.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pct_max_max = self.getPctFirstBuy(max_pair, last_candle)
# print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}")
return max_pair == pair or pct_max < - 0.25 or (
pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30)
else:
return True
def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7):
"""
Sélectionne les features les plus corrélées avec target,
tout en supprimant celles trop corrélées entre elles.
"""
# 1⃣ Calcul des corrélations absolues avec la cible
corr = df.corr(numeric_only=True)
corr_target = corr[target].abs().sort_values(ascending=False)
# 2⃣ Prend les N features les plus corrélées avec la cible (hors target)
features = corr_target.drop(target).head(top_n).index.tolist()
# 3⃣ Évite les features trop corrélées entre elles
selected = []
for feat in features:
too_correlated = False
for sel in selected:
if abs(corr.loc[feat, sel]) > corr_threshold:
too_correlated = True
break
if not too_correlated:
selected.append(feat)
# 4⃣ Retourne un DataFrame propre avec les valeurs de corrélation
selected_corr = pd.DataFrame({
"feature": selected,
"corr_with_target": [corr.loc[f, target] for f in selected]
}).sort_values(by="corr_with_target", key=np.abs, ascending=False)
return selected_corr
def graphFonctionApprise(self, path, X_test, y_test, y_pred):
# Exemple : trier les valeurs de X_test et les prédictions
x_sorted = np.argsort(X_test.iloc[:, 0])
x = X_test.iloc[:, 0].iloc[x_sorted]
y_true = y_test.iloc[x_sorted]
y_pred = y_pred[x_sorted]
plt.figure(figsize=(12, 6))
plt.plot(x, y_true, label="Réel", color="blue", alpha=0.7)
plt.plot(x, y_pred, label="Prédit (LGBM)", color="red", alpha=0.7)
plt.title("Fonction apprise par LGBMRegressor")
plt.xlabel("Feature principale")
plt.ylabel("Valeur prédite")
plt.legend()
plt.grid(True)
out_path = f"{self.path}/lgbm_function.png"
plt.savefig(out_path, bbox_inches="tight")
plt.close()
print(f"Graphique sauvegardé : {out_path}")
def graphFonctionAppriseFeature(self, path, X_test, y_test, y_pred):
plt.figure(figsize=(14, 8))
colors = sns.color_palette("coolwarm", n_colors=X_test.shape[1])
# Conversion en DataFrame pour manip plus simple
df = X_test.copy()
df["y_pred"] = y_pred
# --- filtrage sur y_pred (ou sur chaque feature si tu veux)
mean = df["y_pred"].mean()
std = df["y_pred"].std()
df = df[(df["y_pred"] >= mean - 2 * std) & (df["y_pred"] <= mean + 2 * std)]
# --- tracé
for i, col in enumerate(X_test.columns):
plt.plot(df[col], df["y_pred"], '.', color=colors[i], alpha=0.4, label=col)
plt.title("Fonction apprise par LGBMRegressor (filtrée à ±2σ)")
plt.xlabel("Valeur feature")
plt.ylabel("Valeur prédite")
plt.legend(loc="right")
plt.grid(True)
out_path = f"{self.path}/lgbm_features.png"
plt.savefig(out_path, bbox_inches="tight")
plt.close()
print(f"Graphique sauvegardé : {out_path}")
def optuna(self, path, X_train, X_test, y_train, y_test):
# Suppose que X_train, y_train sont déjà définis
# ou sinon :
# X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)
print("Description")
print(X_train.describe().T.sort_values("std"))
def objective(trial):
params = {
'objective': 'regression',
'metric': 'rmse',
'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
'learning_rate': trial.suggest_float('learning_rate', 0.005, 0.2, log=True),
'max_depth': trial.suggest_int('max_depth', 3, 15),
'num_leaves': trial.suggest_int('num_leaves', 20, 300),
'subsample': trial.suggest_float('subsample', 0.5, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 0.0, 10.0),
'reg_lambda': trial.suggest_float('reg_lambda', 0.0, 10.0),
'random_state': 42,
}
model = LGBMRegressor(**params)
model.fit(X_train, y_train)
# On peut aussi valider sur un split interne
preds = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, preds))
return rmse
# Crée une étude Optuna
study = optuna.create_study(direction="minimize") # on veut minimiser l'erreur
study.optimize(objective, n_trials=50, show_progress_bar=True)
# 🔹 Afficher les meilleurs résultats
print("✅ Meilleurs hyperparamètres trouvés :")
print(study.best_params)
print(f"Meilleur RMSE : {study.best_value:.4f}")
# 🔹 Sauvegarder les résultats
optuna_path = f"{self.path}/optuna_lgbm_results.txt"
with open(optuna_path, "w") as f:
f.write(f"Best params:\n{study.best_params}\n")
f.write(f"Best RMSE: {study.best_value:.4f}\n")
print(f"Résultats sauvegardés dans : {optuna_path}")
# 🔹 Créer le modèle final avec les meilleurs paramètres
print("🚀 Entraînement du modèle LightGBM...")
# -- Appliquer le filtrage --
X_train_filtered = self.filter_features(X_train, y_train)
best_model = LGBMRegressor(**study.best_params)
best_model.fit(X_train_filtered, y_train)
# fig1 = vis.plot_optimization_history(study)
# fig1.write_image("/home/souti/freqtrade/user_data/plots/optuna_history.png")
#
# fig2 = vis.plot_param_importances(study)
# fig2.write_image("/home/souti/freqtrade/user_data/plots/optuna_importance.png")
return best_model, X_train_filtered
def filter_features(self, X: pd.DataFrame, y: pd.Series, corr_threshold: float = 0.95):
"""Filtre les colonnes peu utiles ou redondantes"""
print("🔍 Filtrage automatique des features...")
# 1⃣ Supprimer les colonnes constantes
vt = VarianceThreshold(threshold=1e-5)
X_var = pd.DataFrame(vt.fit_transform(X), columns=X.columns[vt.get_support()])
print(f" - {len(X.columns) - X_var.shape[1]} colonnes supprimées (variance faible)")
# 2⃣ Supprimer les colonnes très corrélées entre elles
corr = X_var.corr().abs()
upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool))
drop_cols = [column for column in upper.columns if any(upper[column] > corr_threshold)]
X_corr = X_var.drop(columns=drop_cols, errors='ignore')
print(f" - {len(drop_cols)} colonnes supprimées (corrélation > {corr_threshold})")
# 3⃣ Facultatif : supprimer les colonnes entièrement NaN
X_clean = X_corr.dropna(axis=1, how='all')
print(f"{X_clean.shape[1]} colonnes conservées après filtrage.\n")
return X_clean
def setTrends(self, dataframe: DataFrame):
SMOOTH_WIN=10
df = dataframe.copy()
# # --- charger les données ---
# df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
# --- calcul SMA14 ---
# df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14)
# --- pente brute ---
df['slope'] = df['sma12'].diff()
# --- lissage EMA ---
df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean()
# df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3)
# --- normalisation relative ---
df['slope_norm'] = 10000 * df['slope_smooth'] / df['close']
# df['slope_norm'].fillna(0, inplace=True)
df['slope_norm'] = df['slope_norm'].fillna(0)
dataframe['slope_norm'] = df['slope_norm']
try:
from lightgbm import LGBMRegressor
_HAS_LGBM = True
except Exception:
_HAS_LGBM = False
def make_model(self, model_type="linear", degree=2, random_state=0):
model_type = model_type.lower()
if model_type == "linear":
return LinearRegression()
if model_type == "poly":
return make_pipeline(StandardScaler(), PolynomialFeatures(degree=degree, include_bias=False),
LinearRegression())
if model_type == "svr":
return make_pipeline(StandardScaler(), SVR(kernel="rbf", C=1.0, epsilon=0.1))
if model_type == "rf":
return RandomForestRegressor(n_estimators=100, random_state=random_state, n_jobs=1)
if model_type == "lgbm":
if not _HAS_LGBM:
raise RuntimeError("lightgbm n'est pas installé")
return LGBMRegressor(n_estimators=100, random_state=random_state)
raise ValueError(f"model_type inconnu: {model_type}")
def calculateRegressionNew(self, df, indic, lookback=20, future_steps=5, model_type="linear"):
df = df.copy()
pred_col = f"{indic}_future_pred_cons"
df[pred_col] = np.nan
X_idx = np.arange(lookback).reshape(-1, 1)
values = df[indic].values
n = len(values)
model = LinearRegression()
for i in range(lookback, n - future_steps):
window = values[i - lookback:i]
# cible = vraie valeur future
y_target = values[i + future_steps]
if np.isnan(window).any() or np.isnan(y_target):
continue
# entraînement
model.fit(X_idx, window)
# prédiction de la valeur future
future_x = np.array([[lookback + future_steps - 1]])
pred_future = model.predict(future_x)[0]
# la prédiction concerne i + future_steps
df.iloc[i + future_steps, df.columns.get_loc(pred_col)] = pred_future
return df
# ==========================================================
# NOUVELLE VERSION : calcule AUSSI les dernières valeurs !
# ==========================================================
def calculateRegression(
self,
df,
indic,
lookback=30,
future_steps=5,
model_type="linear",
degree=2,
weight_mode="exp",
weight_strength=2,
clip_k=2.0,
blend_alpha=0.7,
):
values = df[indic].values.astype(float)
n = len(values)
colname = f"{indic}_future_pred_cons"
df[colname] = np.nan
# pré-calcul des fenêtres
windows = np.lib.stride_tricks.sliding_window_view(values, lookback)
# windows[k] = valeurs de [k .. k+lookback-1]
# indices valides dentraînement
trainable_end = n - future_steps
# créer une fois le modèle
model = self.make_model(model_type=model_type, degree=degree)
# ================
# BOUCLE TRAINING
# ================
for i in range(lookback, trainable_end):
window = values[i - lookback:i]
if np.isnan(window).any():
continue
# delta future réelle
y_target = values[i + future_steps] - values[i]
# features = positions dans la fenêtre : 0..lookback-1
X_window = np.arange(lookback).reshape(-1, 1)
# sample weights
if weight_mode == "exp":
weights = np.linspace(0.1, 1, lookback) ** weight_strength
else:
weights = None
# entraînement
try:
model.fit(X_window, window, sample_weight=weights)
except Exception:
model.fit(X_window, window)
# prédiction de la valeur future (position lookback+future_steps-1)
y_pred_value = model.predict(
np.array([[lookback + future_steps - 1]])
)[0]
pred_delta = y_pred_value - values[i]
# clipping par volatilité locale
local_std = np.std(window)
max_change = clip_k * (local_std if local_std > 0 else 1e-9)
pred_delta = np.clip(pred_delta, -max_change, max_change)
# blend
final_pred_value = (
blend_alpha * (values[i] + pred_delta)
+ (1 - blend_alpha) * values[i]
)
df.iloc[i, df.columns.get_loc(colname)] = final_pred_value
# ==========================================================
# 🔥 CALCUL DES DERNIÈRES VALEURS MANQUANTES 🔥
# ==========================================================
# Il reste les indices : [n - future_steps … n - 1]
for i in range(trainable_end, n):
# fenêtre glissante de fin
if i - lookback < 0:
continue
window = values[i - lookback:i]
if np.isnan(window).any():
continue
# features
X_window = np.arange(lookback).reshape(-1, 1)
try:
model.fit(X_window, window)
except:
continue
# prédiction dune continuation locale : future_steps = 1 en fin
y_pred_value = model.predict(np.array([[lookback]]))[0]
pred_delta = y_pred_value - values[i - 1]
final_pred_value = (
blend_alpha * (values[i - 1] + pred_delta)
+ (1 - blend_alpha) * values[i - 1]
)
df.iloc[i, df.columns.get_loc(colname)] = final_pred_value
return df
# def calculateRegression(self,
# df,
# indic,
# lookback=30,
# future_steps=5,
# model_type="linear",
# degree=2,
# random_state=0,
# weight_mode="exp", # "exp", "linear" ou None
# weight_strength=0.2, # plus cest grand, plus les dernières bougies comptent
# ):
# """
# Ajoute une régression glissante qui prévoit la valeur future à horizon 'future_steps',
# avec pondération des dernières valeurs si weight_mode != None.
# """
# df = df.copy()
# colname = f"{indic}_future_pred_{model_type}"
# df[colname] = np.nan
#
# values = df[indic].values
# n = len(values)
# X_window = np.arange(lookback).reshape(-1, 1)
#
# # génération du schéma de pondération
# if weight_mode == "exp":
# # exponentiel → les derniers points pèsent beaucoup plus
# weights = np.exp(np.linspace(-weight_strength, weight_strength, lookback))
# elif weight_mode == "linear":
# # poids linéaire → 1..lookback
# weights = np.linspace(0.5, 1.0, lookback)
# else:
# weights = np.ones(lookback)
#
# for i in range(lookback, n - future_steps):
# y_window = values[i - lookback:i]
# if np.isnan(y_window).any():
# continue
#
# model = self.make_model(model_type=model_type, degree=degree, random_state=random_state)
#
# try:
# model.fit(X_window, y_window, sample_weight=weights)
# except TypeError:
# # certains modèles (RF) ne supportent pas sample_weight dans ce contexte
# model.fit(X_window, y_window)
# except Exception:
# continue
#
# X_pred = np.array([[lookback + future_steps - 1]])
# try:
# pred = model.predict(X_pred)[0]
# except Exception:
# continue
#
# df.iloc[i, df.columns.get_loc(colname)] = pred
#
# return df
# def calculateRegression(self, df, indic, lookback=30, future_steps=5):
# """
# Ajoute un indicateur {indic}_future_pred qui contient,
# pour chaque bougie n, la valeur attendue à n + future_steps
# selon une régression linéaire sur les lookback dernières bougies.
# """
# df = df.copy()
# df[f"{indic}_future_pred"] = np.nan
#
# values = df[indic].values
# n = len(values)
#
# model = LinearRegression()
#
# for i in range(lookback, n - future_steps):
# # Fenêtre dapprentissage
# X = np.arange(lookback).reshape(-1, 1)
# y = values[i - lookback:i]
#
# model.fit(X, y)
#
# # Prédiction future
# next_X = np.array([[lookback + future_steps - 1]])
# future_pred = model.predict(next_X)[0]
#
# # On insère la prédiction à la position actuelle (n)
# df.iloc[i, df.columns.get_loc(f"{indic}_future_pred")] = future_pred
#
# return df
def add_future_quantiles(self, dataframe, indic, lookback=30, future_steps=5, quantiles=[0.1, 0.5, 0.9]):
working_columns = self.listUsableColumns(dataframe)
df = dataframe[self.model_indicators].copy()
n = len(df)
target = self.indicator_target + "_future"
df[target] = dataframe[self.indicator_target].shift(-24) # > df['sma24'] * 1.003).astype(int)
df[target] = df[target].fillna(0) #.astype(int)
# Créer les colonnes pour chaque quantile
for q in quantiles:
df[f"{indic}_future_q{int(q * 100)}"] = np.nan
# Préparer toutes les fenêtres X
X = np.array([df[indic].iloc[i - lookback:i].values for i in range(lookback, n - future_steps)])
y_idx = np.arange(lookback, n - future_steps) + future_steps # index des valeurs futures
# Imputer les NaN
imputer = SimpleImputer(strategy='median')
X_imputed = imputer.fit_transform(X)
# Pour chaque quantile, créer un modèle et prédire
for q in quantiles:
model = HistGradientBoostingRegressor(loss='quantile', quantile=q, max_iter=100)
# Entrainer chaque ligne X_imputed à prédire la dernière valeur de la fenêtre + future_steps
# Ici, comme on prédit delta future par fenêtre, on peut utiliser la valeur cible correspondante
y = df[indic].iloc[y_idx].values
model.fit(X_imputed, y)
y_pred = model.predict(X_imputed)
# Écrire les prédictions dans le dataframe
df.iloc[lookback:n - future_steps, df.columns.get_loc(f"{indic}_future_q{int(q * 100)}")] = y_pred
df_plot = df.iloc[lookback:-future_steps]
self.plot_future_quantiles_band(df_plot, indic=self.indicator_target, quantiles=[0.1, 0.5, 0.9])
# self.compute_quantile_confidence(df_plot, indic=self.indicator_target, quantiles=[0.1, 0.5, 0.9])
# fig, ax = plt.subplots(figsize=(20, 20))
# for q in quantiles:
# plt.plot(stats.index.astype(str), stats[q], marker='o', label=f"Q{int(q * 100)}")
# plt.xticks(rotation=45)
# plt.xlabel(f"{indic} bins")
# plt.ylabel(f"Quantiles")
# plt.title(f"Distribution quantile de {indic}")
# plt.legend()
# plt.grid(True)
# plt.tight_layout()
# # plt.show()
# # --- Sauvegarde ---
# output_path = f"{path}/Distribution_quantile.png"
# plt.savefig(output_path, bbox_inches="tight", dpi=150)
# plt.close(fig)
#
# target = "future_return"
quantiles = [0.1, 0.25, 0.5, 0.75, 0.9]
for indicator in working_columns:
df["bin"] = pd.qcut(df[indicator], q=20, duplicates="drop")
stats = df.groupby("bin")[target].quantile(quantiles).unstack()
fig, ax = plt.subplots(figsize=(10, 10))
# plt.figure(figsize=(12, 6))
for q in stats.columns:
plt.plot(stats.index.astype(str), stats[q], marker='o', label=f"Q{int(q * 100)}")
plt.xticks(rotation=45)
plt.xlabel(f"{indicator} bins")
plt.ylabel(f"Quantiles of {target}")
plt.title(f"Distribution quantile de {target} selon {indicator}")
plt.legend()
plt.grid(True)
plt.tight_layout()
# --- Sauvegarde ---
output_path = f"{self.path}/Distribution_{indicator}.png"
plt.savefig(output_path, bbox_inches="tight", dpi=150)
plt.close(fig)
# plt.show()
return df
def plot_future_quantiles_band(self, df, indic, quantiles=[0.1, 0.5, 0.9], lookback=30, future_steps=5):
"""
df: DataFrame contenant la colonne réelle et les colonnes de quantiles
indic: nom de la colonne cible (ex: 'mid')
quantiles: liste des quantiles prédits
"""
# plt.figure(figsize=(16, 6))
fig, ax = plt.subplots(figsize=(96, 30))
# Série réelle
plt.plot(df[indic], label=f"{indic} réel", color='black', linewidth=1.2)
# Récupérer les colonnes de quantiles
cols_q = [f"{indic}_future_q{int(q * 100)}" for q in quantiles]
# Vérifier que tous les quantiles existent
cols_q = [c for c in cols_q if c in df.columns]
if len(cols_q) < 2:
print("Au moins deux quantiles sont nécessaires pour afficher les bandes")
return
# Ordre : q_min, q_median, q_max
df_plot = df[cols_q]
# Couleur pour la bande
color = sns.color_palette("coolwarm", n_colors=1)[0]
# Tracer la bande entre min et max quantiles
plt.fill_between(df.index,
df_plot.iloc[:, 0], # quantile bas (ex: 10%)
df_plot.iloc[:, -1], # quantile haut (ex: 90%)
color=color,
alpha=0.3,
label=f"Intervalle {quantiles[0] * 100}-{quantiles[-1] * 100}%")
# Tracer la médiane
if len(cols_q) >= 3:
plt.plot(df_plot.iloc[:, 1], color=color, linestyle='--', linewidth=1, label="Quantile médian")
plt.title(f"Prédiction futures valeurs de {indic} avec intervalle de quantiles")
plt.xlabel("Index / Bougies")
plt.ylabel(indic)
plt.legend()
plt.grid(True)
# plt.show()
# --- Sauvegarde ---
output_path = f"{self.path}/Prédiction futures valeurs de {indic}.png"
plt.savefig(output_path, bbox_inches="tight", dpi=150)
plt.close(fig)
def compute_quantile_confidence(self, df, indic, quantiles=[0.1, 0.5, 0.9]):
"""
df: DataFrame contenant les colonnes des quantiles
indic: nom de la colonne réelle
quantiles: liste des quantiles prédits
Retourne une série score [-1,1], positif = au-dessus de la médiane, négatif = en dessous
"""
# df['quantile_conf'] = compute_quantile_confidence(df_plot, indic='mid')
#
# # Exemple de signal simple
# df['buy_signal'] = df['quantile_conf'] < -0.5 # valeur sous la médiane + bande étroite
# df['sell_signal'] = df['quantile_conf'] > 0.5 # valeur au-dessus de la médiane + bande étroite
col_low = f"{indic}_future_q{int(quantiles[0] * 100)}"
col_med = f"{indic}_future_q{int(quantiles[1] * 100)}"
col_high = f"{indic}_future_q{int(quantiles[2] * 100)}"
# largeur de bande (incertitude)
band_width = df[col_high] - df[col_low] + 1e-9 # éviter division par 0
# distance normalisée à la médiane
score = (df[indic] - df[col_med]) / band_width
# clipper le score dans [-1,1] pour éviter les valeurs extrêmes
score = np.clip(score, -1, 1)
# plt.figure(figsize=(16, 6))
fig, ax = plt.subplots(figsize=(16, 6))
plt.plot(df[indic], color='black', label='Valeur réelle')
plt.fill_between(df.index,
df[f"{indic}_future_q10"],
df[f"{indic}_future_q90"],
alpha=0.3, color='blue', label='Intervalle 10%-90%')
plt.plot(df[f"{indic}_future_q50"], linestyle='--', color='blue', label='Médiane')
# Ajouter le score comme couleur de fond
plt.scatter(df.index, df[indic], c=df['quantile_conf'], cmap='coolwarm', s=20)
plt.colorbar(label='Score de confiance')
plt.title("Prédiction + score de confiance quantile")
plt.legend()
plt.grid(True)
# plt.show()
# --- Sauvegarde ---
output_path = f"{self.path}/Prédiction score confiance de {indic}.png"
plt.savefig(output_path, bbox_inches="tight", dpi=150)
plt.close(fig)
return score
# def loadTensorFlow(self, dataframe, metadata, lookback=50, future_steps=1):
# self.model = load_model(f"{self.path}/lstm_model.keras", compile=False)
#
# # features = toutes les colonnes sauf la cible
# feature_columns = self.model_indicators #[col for col in dataframe.columns if col != self.indicator_target]
# X_values = dataframe[feature_columns].values
#
# # normalisation avec le même scaler que l'entraînement
# scaler_X = MinMaxScaler()
# scaler_X.fit(X_values) # ou charger les paramètres si sauvegardés
# X_scaled = scaler_X.transform(X_values)
#
# # création des fenêtres glissantes
# X = np.lib.stride_tricks.sliding_window_view(X_scaled, window_shape=(self.lookback, X_scaled.shape[1]))
# # np.lib.stride_tricks.sliding_window_view ne supporte pas directement 2D → il vaut mieux utiliser une boucle :
# X_seq = []
# for i in range(len(X_scaled) - self.lookback):
# X_seq.append(X_scaled[i:i + self.lookback])
# X_seq = np.array(X_seq)
#
# # prédiction
# y_pred = self.model.predict(X_seq, verbose=0).flatten()
#
# # alignement avec les données
# preds = [np.nan] * len(dataframe)
# start = self.lookback
# end = start + len(y_pred)
# preds[start:end] = y_pred[:end - start]
#
# dataframe["lstm_pred"] = preds
#
# def trainTensorFlow(self, dataframe, metadata, lookback=50, future_steps=1):
# # 1) définir la cible
# y_values = dataframe[self.indicator_target].values.reshape(-1, 1)
#
# # 2) définir les features (toutes les colonnes sauf la cible)
# feature_columns = self.model_indicators #[col for col in dataframe.columns if col != self.indicator_target]
# X_values = dataframe[feature_columns].values
#
# # 3) normalisation
# scaler_X = MinMaxScaler()
# X_scaled = scaler_X.fit_transform(X_values)
#
# scaler_y = MinMaxScaler()
# y_scaled = scaler_y.fit_transform(y_values)
#
# # 4) création des fenêtres glissantes
# X = []
# y = []
# for i in range(len(X_scaled) - lookback - future_steps):
# X.append(X_scaled[i:i + lookback])
# y.append(y_scaled[i + lookback + future_steps])
#
# X = np.array(X)
# y = np.array(y)
#
# # 5) définition du modèle LSTM
# model = Sequential([
# LSTM(64, return_sequences=False, input_shape=(lookback, X.shape[2])),
# Dense(32, activation="relu"),
# Dense(1)
# ])
#
# model.compile(loss="mse", optimizer="adam")
# model.fit(X, y, epochs=20, batch_size=32, verbose=1)
#
# # 6) sauvegarde
# model.save(f"{self.path}/lstm_model.keras")
# np.save(f"{self.path}/lstm_scaler_X.npy", scaler_X.data_max_)
# np.save(f"{self.path}/lstm_scaler_y.npy", scaler_y.data_max_)
# # pour restaurer
#
# # df = dataframe[self.model_indicators].copy()
# #
# # # Construction dataset X / y
# # X = []
# # y = []
# #
# # prices = df[self.indicator_target].values
# #
# # for i in range(lookback, len(prices) - future_steps):
# # X.append(prices[i - lookback:i])
# # y.append(prices[i + future_steps])
# #
# # X = np.array(X).reshape(-1, lookback, 1)
# # y = np.array(y)
# #
# # # --- Définition du modèle ---
# # model = models.Sequential([
# # layers.Input((lookback, 1)),
# # layers.LSTM(64),
# # layers.Dense(32, activation="relu"),
# # layers.Dense(1)
# # ])
# #
# # model.compile(optimizer="adam", loss="mse")
# # model.summary()
# #
# # # --- Entraînement ---
# # model.fit(X, y, epochs=20, batch_size=32, verbose=1)
# #
# # # --- Sauvegarde ---
# # model.save(f"{self.path}/lstm_model.keras", include_optimizer=False)
# #
# print("Modèle entraîné et sauvegardé → lstm_model.h5")
def kerasGenerateGraphs(self, dataframe):
model = self.model
self.kerasGenerateGraphModel(model)
self.kerasGenerateGraphPredictions(model, dataframe, self.lookback)
self.kerasGenerateGraphPoids(model)
def kerasGenerateGraphModel(self, model):
plot_model(
model,
to_file=f"{self.path}/lstm_model.png",
show_shapes=True,
show_layer_names=True
)
def kerasGenerateGraphPredictions(self, model, dataframe, lookback):
preds = self.tensorFlowGeneratePredictions(dataframe, lookback, model)
# plot
plt.figure(figsize=(36, 8))
plt.plot(dataframe[self.indicator_target].values, label=self.indicator_target)
plt.plot(preds, label="lstm_pred")
plt.legend()
plt.savefig(f"{self.path}/lstm_predictions.png")
plt.close()
def kerasGenerateGraphPoids(self, model):
for i, layer in enumerate(model.layers):
weights = layer.get_weights() # liste de tableaux numpy
# Sauvegarde SAFE : tableau dobjets
np.save(
f"{self.path}/layer_{i}_weights.npy",
np.array(weights, dtype=object)
)
# Exemple lecture et heatmap
weights_layer0 = np.load(
f"{self.path}/layer_{i}_weights.npy",
allow_pickle=True
)
# Choisir un poids 2D
W = None
for w in weights_layer0:
if isinstance(w, np.ndarray) and w.ndim == 2:
W = w
break
if W is None:
print(f"Aucune matrice 2D dans layer {i} (rien à afficher).")
return
plt.figure(figsize=(8, 6))
sns.heatmap(W, cmap="viridis")
plt.title(f"Poids 2D du layer {i}")
plt.savefig(f"{self.path}/layer{i}_weights.png")
plt.close()
# -------------------
# Entraînement
# -------------------
def trainTensorFlow(self, dataframe, future_steps=1, lookback=50, epochs=40, batch_size=32):
X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback)
# 6) Modèle LSTM
self.model = Sequential([
LSTM(64, return_sequences=False, input_shape=(lookback, X_seq.shape[2])),
Dense(32, activation="relu"),
Dense(1)
])
self.model.compile(loss='mse', optimizer=Adam(learning_rate=1e-4))
self.model.fit(X_seq, y_seq, epochs=epochs, batch_size=batch_size, verbose=1)
# 7) Sauvegarde
self.model.save(f"{self.path}/lstm_model.keras")
# np.save(f"{self.path}/lstm_scaler_X.npy", self.scaler_X.data_max_)
# np.save(f"{self.path}/lstm_scaler_y.npy", self.scaler_y.data_max_)
def tensorFlowPrepareDataFrame(self, dataframe, future_steps, lookback):
target = self.indicator_target
# 1) Détecter NaN / Inf et nettoyer
feature_columns = self.model_indicators # [col for col in dataframe.columns if col != target]
df = dataframe.copy()
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.dropna(subset=feature_columns + [target], inplace=True)
# 2) Séparer features et cible
X_values = df[feature_columns].values
y_values = df[target].values.reshape(-1, 1)
# 3) Gestion colonnes constantes (éviter division par zéro)
for i in range(X_values.shape[1]):
if X_values[:, i].max() == X_values[:, i].min():
X_values[:, i] = 0.0
if y_values.max() == y_values.min():
y_values[:] = 0.0
# 4) Normalisation
self.scaler_X = MinMaxScaler()
X_scaled = self.scaler_X.fit_transform(X_values)
if self.y_no_scale:
y_scaled = y_values
else:
self.scaler_y = MinMaxScaler()
y_scaled = self.scaler_y.fit_transform(y_values)
# 5) Création des fenêtres glissantes
X_seq = []
y_seq = []
for i in range(len(X_scaled) - lookback - future_steps):
X_seq.append(X_scaled[i:i + lookback])
y_seq.append(y_scaled[i + lookback + future_steps])
X_seq = np.array(X_seq)
y_seq = np.array(y_seq)
# Vérification finale
if np.isnan(X_seq).any() or np.isnan(y_seq).any():
raise ValueError("X_seq ou y_seq contient encore des NaN")
if np.isinf(X_seq).any() or np.isinf(y_seq).any():
raise ValueError("X_seq ou y_seq contient encore des Inf")
return X_seq, y_seq
# -------------------
# Prédiction
# -------------------
def predictTensorFlow(self, dataframe, future_steps=1, lookback=50):
feature_columns = self.model_indicators #[col for col in dataframe.columns if col != self.indicator_target]
# charger le modèle si pas déjà chargé
if self.model is None:
self.model = load_model(f"{self.path}/lstm_model.keras", compile=False)
X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback)
preds = self.tensorFlowGeneratePredictions(dataframe, lookback, self.model)
# # features = toutes les colonnes sauf la cible
# feature_columns = self.model_indicators #[col for col in dataframe.columns if col != self.indicator_target]
# X_values = dataframe[feature_columns].values
#
# # normalisation (avec le scaler utilisé à l'entraînement)
# X_scaled = self.scaler_X.transform(X_values)
#
# # créer les séquences glissantes
# X_seq = []
# for i in range(len(X_scaled) - self.lookback):
# X_seq.append(X_scaled[i:i + self.lookback])
# X_seq = np.array(X_seq)
#
# # prédictions
# y_pred_scaled = self.model.predict(X_seq, verbose=0).flatten()
# y_pred = self.scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
#
# # alignement avec les données
# preds = [np.nan] * len(dataframe)
# start = self.lookback
# end = start + len(y_pred)
# # preds[start:end] = y_pred[:end - start]
# preds[start:start + len(y_pred)] = y_pred
#
# # # features
# # X_values = dataframe[feature_columns].values
# # X_scaled = self.scaler_X.transform(X_values)
# #
# # # création des fenêtres
# # X_seq = []
# # for i in range(len(X_scaled) - self.lookback):
# # X_seq.append(X_scaled[i:i + self.lookback])
# # X_seq = np.array(X_seq)
# #
# # # prédiction
# # y_pred_scaled = self.model.predict(X_seq, verbose=0).flatten()
# # y_pred = self.scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
# #
# # # alignement avec le dataframe
# # preds = [np.nan] * len(dataframe)
# # start = self.lookback
# # end = start + len(y_pred)
# # preds[start:end] = y_pred[:end-start]
# # # preds[start:start + len(y_pred)] = y_pred
dataframe["lstm_pred"] = preds
return dataframe
def tensorFlowGeneratePredictions(self, dataframe, lookback, model):
# features = toutes les colonnes sauf la cible
feature_columns = self.model_indicators # [col for col in dataframe.columns if col != self.indicator_target]
X_values = dataframe[feature_columns].values
# normalisation (avec le scaler utilisé à l'entraînement)
X_scaled = self.scaler_X.transform(X_values)
# créer les séquences glissantes
X_seq = []
for i in range(len(X_scaled) - lookback):
X_seq.append(X_scaled[i:i + lookback])
X_seq = np.array(X_seq)
# prédictions
y_pred_scaled = model.predict(X_seq, verbose=0).flatten()
if self.y_no_scale:
y_pred = y_pred_scaled
else:
y_pred = self.scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
# alignement avec les données
preds = [np.nan] * len(dataframe)
start = lookback
end = start + len(y_pred)
# preds[start:end] = y_pred[:end - start]
preds[start:start + len(y_pred)] = y_pred
return preds