# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np import os import json from pandas import DataFrame from typing import Optional, Union, Tuple import math import logging from pathlib import Path # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from datetime import timezone, timedelta logger = logging.getLogger(__name__) # Machine Learning from sklearn.model_selection import train_test_split import joblib import matplotlib.pyplot as plt from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve, precision_score, recall_score, precision_recall_curve, f1_score, mean_squared_error, r2_score ) from sklearn.tree import export_text import inspect from sklearn.feature_selection import SelectFromModel from tabulate import tabulate from sklearn.feature_selection import VarianceThreshold import seaborn as sns import lightgbm as lgb from sklearn.model_selection import cross_val_score import optuna.visualization as vis import optuna from lightgbm import LGBMRegressor from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression, Ridge, HuberRegressor from sklearn.preprocessing import StandardScaler, PolynomialFeatures from sklearn.pipeline import make_pipeline from sklearn.svm import SVR from sklearn.ensemble import RandomForestRegressor from sklearn.ensemble import GradientBoostingRegressor from sklearn.preprocessing import StandardScaler from sklearn.ensemble import HistGradientBoostingRegressor from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline # Tensorflow import pandas as pd import numpy as np import tensorflow as tf from tensorflow.keras import layers, models from tensorflow.keras.models import load_model from keras.utils import plot_model from keras.models import Sequential from keras.layers import LSTM, Dense from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense from tensorflow.keras.optimizers import Adam os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # désactive complètement le GPU os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices=false" # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" import warnings warnings.filterwarnings( "ignore", message=r".*No further splits with positive gain.*" ) def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df class Zeus_TensorFlow(IStrategy): startup_candle_count = 24 * 12 # Machine Learning model = None model_indicators = [] indicator_target = 'mid_smooth_12_deriv1' # Tensorflow lookback = 20 future_steps = 1 y_no_scale = True path = f"user_data/plots/" # ROI table: minimal_roi = { "0": 0.564, "567": 0.273, "2814": 0.12, "7675": 0 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = False trailing_stop = True trailing_stop_positive = 0.15 trailing_stop_positive_offset = 0.20 trailing_only_offset_is_reached = True # Buy hypers timeframe = '5m' max_open_trades = 5 max_amount = 40 parameters = {} # DCA config position_adjustment_enable = True plot_config = { "main_plot": { "sma24_1h": { "color": "pink" }, "sma5_1d": { "color": "blue" }, # "sma24": { # "color": "yellow" # }, "sma60": { "color": "green" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", }, # "sma12": { # "color": "blue" # }, "mid_smooth_3_1h": { "color": "blue" } }, "subplots": { "Rsi": { "max_rsi_24": { "color": "blue" }, "max_rsi_24_1h": { "color": "pink" }, # "rsi_1h": { # "color": "red" # }, # "rsi_1d": { # "color": "blue" # } }, "Rsi_deriv1": { "sma24_deriv1_1h": { "color": "pink" }, "sma24_deriv1": { "color": "yellow" }, "sma5_deriv1_1d": { "color": "blue" }, "sma60_deriv1": { "color": "green" } }, "Rsi_deriv2": { "sma24_deriv2_1h": { "color": "pink" }, "sma24_deriv2": { "color": "yellow" }, "sma5_deriv2_1d": { "color": "blue" }, "sma60_deriv2": { "color": "green" } }, 'Macd': { "macd_rel_1d": { "color": "cyan" }, "macdsignal_rel_1d": { "color": "pink" }, "macdhist_rel_1d": { "color": "yellow" } } } } columns_logged = False pairs = { pair: { "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 # factors = [1, 1.1, 1.25, 1.5, 2.0, 3] # thresholds = [2, 5, 10, 20, 30, 50] factors = [0.5, 0.75, 1, 1.25, 1.5, 2] thresholds = [0, 2, 5, 10, 30, 45] trades = list() max_profit_pairs = {} mise_factor_buy = DecimalParameter(0.01, 0.1, default=0.05, decimals=2, space='buy', optimize=True, load=True) indicators = {'sma5', 'sma12', 'sma24', 'sma60'} indicators_percent = {'percent', 'percent3', 'percent12', 'percent24', 'percent_1h', 'percent3_1h', 'percent12_1h', 'percent24_1h'} mises = IntParameter(1, 50, default=5, space='buy', optimize=True, load=True) ml_prob_buy = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='buy', optimize=True, load=True) ml_prob_sell = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='sell', optimize=True, load=True) pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=True, load=True) pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True) rsi_deb_protect = IntParameter(50, 90, default=70, space='protection', optimize=True, load=True) rsi_end_protect = IntParameter(20, 60, default=55, space='protection', optimize=True, load=True) sma24_deriv1_deb_protect = DecimalParameter(-4, 4, default=-2, decimals=1, space='protection', optimize=True, load=True) sma24_deriv1_end_protect = DecimalParameter(-4, 4, default=0, decimals=1, space='protection', optimize=True, load=True) # ========================================================================= should_enter_trade_count = 0 def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0)) profit =trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['previous_profit'] = 0 self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None # else: # self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_2 = dataframe.iloc[-3].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) max_profit = self.pairs[pair]['max_profit'] baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 if hours % 4 == 0: self.log_trade( last_candle=last_candle, date=current_time, action="🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying_1h'] else "🟢 CURRENT", dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=round(profit, 2), buys=count_of_buys, stake=0 ) pair_name = self.getShortName(pair) if last_candle['max_rsi_24'] > 85 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0.05: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Rsi85_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if self.pairs[pair]['force_sell']: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Frc_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if profit > 0 and baisse > 0.30: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # if max_profit > 0.5 * count_of_buys and baisse > 0.15: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if (last_candle['sma5_1h'] - before_last_candle_12['sma5_1h']) / last_candle['sma5_1h'] > 0.0002: return None factor = 1 if (self.getShortName(pair) == 'BTC'): factor = 0.5 # if baisse > 2 and baisse > factor * self.pairs[pair]['total_amount'] / 100: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return 'Baisse_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) # # if 1 <= count_of_buys <= 3: if last_candle['max_rsi_24'] > 75 and profit > expected_profit and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0: self.pairs[pair]['force_sell'] = False return str(count_of_buys) + '_' + 'Rsi75_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '') def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] print(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( # int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = '' color = GREEN if profit > 0 else RED color_sma24 = GREEN if last_candle['sma24_deriv1_1h'] > 0 else RED color_sma24_2 = GREEN if last_candle['sma24_deriv2_1h'] > 0 else RED color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1h'] > 0 else RED color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1h'] > 0 else RED color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" # f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_1h_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|" # f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_1h_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|" f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|" f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1_1h'], 2):>5}{RESET}" f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1_1h'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2_1h'], 2):>5}{RESET}" f"|{color_sma5_1h}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}" f"|{color_smooth_1h}{round(last_candle['mid_smooth_1h_deriv1'], 2):>5}{RESET}|{color_smooth2_1h}{round(last_candle['mid_smooth_1h_deriv2'], 2):>5}{RESET}" ) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] short_pair = self.getShortName(pair) self.path = f"user_data/plots/{short_pair}/" dataframe = self.populateDataframe(dataframe, timeframe='5m') # ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative = self.populateDataframe(informative, timeframe='1d') # informative = self.calculateRegression(informative, 'mid', lookback=15) dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") informative = self.populateDataframe(informative, timeframe='1h') informative = self.calculateRegression(informative, 'mid', lookback=5) # # TENSOR FLOW # self.model_indicators = self.listUsableColumns(informative) # if self.dp.runmode.value in ('backtest'): # self.tensorFlowTrain(informative, lookback=self.lookback, future_steps = self.future_steps) # # self.tensorFlowPredict(informative) # # if self.dp.runmode.value in ('backtest'): # self.kerasGenerateGraphs(informative) informative['stop_buying_deb'] = ((informative['max_rsi_24'] > self.rsi_deb_protect.value) & (informative['sma24_deriv1'] < self.sma24_deriv1_deb_protect.value) ) informative['stop_buying_end'] = ((informative['max_rsi_24'] < self.rsi_end_protect.value) & (informative['sma24_deriv1'] > self.sma24_deriv1_end_protect.value) ) latched = np.zeros(len(informative), dtype=bool) for i in range(1, len(informative)): if informative['stop_buying_deb'].iloc[i]: latched[i] = True elif informative['stop_buying_end'].iloc[i]: latched[i] = False else: latched[i] = latched[i - 1] informative['stop_buying'] = latched dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price self.pairs[pair]['first_buy'] = buy.price self.pairs[pair]['first_amount'] = buy.price * buy.filled # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_buy'] = buy.price count = count + 1 amount += buy.price * buy.filled # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count # dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) self.pairs[pair]['total_amount'] = amount # dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24']) # =============================== # lissage des valeurs horaires dataframe['mid_smooth_1h'] = dataframe['mid'].rolling(window=6).mean() dataframe["mid_smooth_1h_deriv1"] = 100 * dataframe["mid_smooth_1h"].diff().rolling(window=6).mean() / \ dataframe['mid_smooth_1h'] dataframe["mid_smooth_1h_deriv2"] = 100 * dataframe["mid_smooth_1h_deriv1"].diff().rolling(window=6).mean() dataframe['mid_smooth_5h'] = talib.EMA(dataframe, timeperiod=60) # dataframe['mid'].rolling(window=60).mean() dataframe["mid_smooth_5h_deriv1"] = 100 * dataframe["mid_smooth_5h"].diff().rolling(window=60).mean() / \ dataframe['mid_smooth_5h'] dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean() dataframe = self.calculateRegression(dataframe, 'mid', lookback=10, future_steps=10, model_type="poly") dataframe = self.calculateRegression(dataframe, 'sma24', lookback=12, future_steps=12) # self.model_indicators = self.listUsableColumns(dataframe) # # TENSOR FLOW # if self.dp.runmode.value in ('backtest'): # self.tensorFlowTrain(dataframe, future_steps = self.future_steps) # # self.tensorFlowPredict(dataframe) # # if self.dp.runmode.value in ('backtest'): # self.kerasGenerateGraphs(dataframe) return dataframe def listUsableColumns(self, dataframe): # Étape 1 : sélectionner numériques numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # Étape 2 : enlever constantes # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and (c.endswith("_deriv1") or not c.endswith("deriv1_1h")) # and not c.endswith("_count") # ] usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 and not c.endswith("_state") # and not c.endswith("_1d") # and not c.endswith("_1h") and not c.endswith("_count") # and not c.startswith("open") and not c.startswith("close") # and not c.startswith("low") and not c.startswith("high") # and not c.startswith("haopen") and not c.startswith("haclose") # and not c.startswith("bb_lower") and not c.startswith("bb_upper") # and not c.startswith("bb_middle") and not c.endswith("_class") and not c.endswith("_price") and not c.startswith('stop_buying')] # Étape 3 : remplacer inf et NaN par 0 # usable_cols = [ # 'hapercent', 'percent', 'percent3', 'percent12', # 'percent24', # 'sma5_dist', 'sma5_deriv1', 'sma12_dist', 'sma12_deriv1', # 'sma24_dist', 'sma24_deriv1', 'sma48_dist', 'sma48_deriv1', 'sma60_dist', 'sma60_deriv1', 'sma60_deriv2', # 'mid_smooth_3_deriv1', 'mid_smooth_5_dist', # 'mid_smooth_5_deriv1', 'mid_smooth_12_dist', # 'mid_smooth_12_deriv1', 'mid_smooth_24_dist', # 'mid_smooth_24_deriv1', # 'rsi', 'max_rsi_12', 'max_rsi_24', # 'rsi_dist', 'rsi_deriv1', # 'min_max_60', 'bb_percent', 'bb_width', # 'macd', 'macdsignal', 'macdhist', 'slope', # 'slope_smooth', 'atr', 'atr_norm', 'adx', 'obv', 'vol_24', # 'rsi_slope', 'adx_change', 'volatility_ratio', 'rsi_diff', # 'slope_ratio', 'volume_sma_deriv', 'volume_dist', 'volume_deriv1', # 'slope_norm', # # 'mid_smooth_1h_deriv1', # # 'mid_smooth_5h_deriv1', 'mid_smooth_5h_deriv2', 'mid_future_pred_cons', # # 'sma24_future_pred_cons' # ] dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) print("Colonnes utilisables pour le modèle :") print(usable_cols) self.model_indicators = usable_cols return self.model_indicators def populateDataframe(self, dataframe, timeframe='5m'): dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe["percent"] = dataframe['mid'].pct_change() dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean() dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean() dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean() # if self.dp.runmode.value in ('backtest'): # dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close'] dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean() self.calculeDerivees(dataframe, 'sma5', timeframe=timeframe, ema_period=5) dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() #dataframe["mid"].rolling(window=12).mean() self.calculeDerivees(dataframe, 'sma12', timeframe=timeframe, ema_period=12) dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() #dataframe["mid"].rolling(window=24).mean() self.calculeDerivees(dataframe, 'sma24', timeframe=timeframe, ema_period=24) dataframe['sma48'] = dataframe['mid'].ewm(span=48, adjust=False).mean() #dataframe["mid"].rolling(window=48).mean() self.calculeDerivees(dataframe, 'sma48', timeframe=timeframe, ema_period=48) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() #dataframe["mid"].rolling(window=60).mean() self.calculeDerivees(dataframe, 'sma60', timeframe=timeframe, ema_period=60) dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", timeframe=timeframe) # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) self.calculeDerivees(dataframe, 'rsi', timeframe=timeframe, ema_period=12) dataframe['max12'] = talib.MAX(dataframe['mid'], timeperiod=12) dataframe['min12'] = talib.MIN(dataframe['mid'], timeperiod=12) dataframe['max60'] = talib.MAX(dataframe['mid'], timeperiod=60) dataframe['min60'] = talib.MIN(dataframe['mid'], timeperiod=60) dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['mid']) / dataframe['min60']) # dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36) # dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36) # dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36'] # dataframe['maxpct36'] = talib.MAX(dataframe['pct36'], timeperiod=36) # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["mid"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma5"] # dataframe["bb_width"] = ( # (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"] # ) # Calcul MACD macd, macdsignal, macdhist = talib.MACD( dataframe['mid'], fastperiod=12, slowperiod=26, signalperiod=9 ) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le dataframe dataframe['macd'] = macd dataframe['macdsignal'] = macdsignal dataframe['macdhist'] = macdhist # Regarde dans le futur # # --- Rendre relatif sur chaque série (-1 → 1) --- # for col in ['macd', 'macdsignal', 'macdhist']: # series = dataframe[col] # valid = series[~np.isnan(series)] # ignorer NaN # min_val = valid.min() # max_val = valid.max() # span = max_val - min_val if max_val != min_val else 1 # dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1 # # dataframe['tdc_macd'] = self.macd_tendance_int( # dataframe, # macd_col='macd_rel', # signal_col='macdsignal_rel', # hist_col='macdhist_rel' # ) # --- pente brute --- dataframe['slope'] = dataframe['sma24'].diff() # --- lissage EMA --- dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean() # --- Volatilité normalisée --- dataframe['atr'] = ta.volatility.AverageTrueRange( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).average_true_range() dataframe['atr_norm'] = dataframe['atr'] / dataframe['close'] # --- Force de tendance --- dataframe['adx'] = ta.trend.ADXIndicator( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).adx() # --- Volume directionnel (On Balance Volume) --- dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(close=dataframe['mid'], volume=dataframe['volume']).on_balance_volume() # --- Volatilité récente (écart-type des rendements) --- dataframe['vol_24'] = dataframe['percent'].rolling(24).std() # Compter les baisses / hausses consécutives self.calculateDownAndUp(dataframe, limit=0.0001) # df : ton dataframe OHLCV + indicateurs existants # Assurez-vous que les colonnes suivantes existent : # 'max_rsi_12', 'roc_24', 'bb_percent_1h' # --- Filtrage des NaN initiaux --- # dataframe = dataframe.dropna() dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width'] dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3) dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9) dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0 ########################### dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean()) self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12) self.setTrends(dataframe) return dataframe def feature_auc_scores(self, X, y): aucs = {} for col in X.columns: try: aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0)) except Exception: aucs[col] = np.nan return pd.Series(aucs).sort_values(ascending=False) def macd_tendance_int(self, dataframe: pd.DataFrame, macd_col='macd', signal_col='macdsignal', hist_col='macdhist', eps=0.0) -> pd.Series: """ Renvoie la tendance MACD sous forme d'entiers. 2 : Haussier 1 : Ralentissement hausse 0 : Neutre -1 : Ralentissement baisse -2 : Baissier """ # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # | Situation | MACD | Signal | Hist | Interprétation | # | -------------------------- | ---------- | --------- | -------- | ------------------------------------------ | # | MACD > 0, Hist croissant | au-dessus | croissant | Haussier | Momentum fort → tendance haussière | # | MACD > 0, Hist décroissant | au-dessus | en baisse | Momentum | La hausse ralentit, prudence | # | MACD < 0, Hist décroissant | en dessous | en baisse | Baissier | Momentum fort → tendance baissière | # | MACD < 0, Hist croissant | en dessous | en hausse | Rebond ? | La baisse ralentit → possible retournement | # Créer une série de 0 par défaut tendance = pd.Series(0, index=dataframe.index) # Cas MACD > signal mask_up = dataframe[macd_col] > dataframe[signal_col] + eps mask_up_hist_pos = mask_up & (dataframe[hist_col] > 0) mask_up_hist_neg = mask_up & (dataframe[hist_col] <= 0) tendance[mask_up_hist_pos] = 2 # Haussier tendance[mask_up_hist_neg] = 1 # Ralentissement hausse # Cas MACD < signal mask_down = dataframe[macd_col] < dataframe[signal_col] - eps mask_down_hist_neg = mask_down & (dataframe[hist_col] < 0) mask_down_hist_pos = mask_down & (dataframe[hist_col] >= 0) tendance[mask_down_hist_neg] = -2 # Baissier tendance[mask_down_hist_pos] = -1 # Ralentissement baisse # Les NaN deviennent neutre tendance[dataframe[[macd_col, signal_col, hist_col]].isna().any(axis=1)] = 0 return tendance def calculateDownAndUp(self, dataframe, limit=0.0001): dataframe['down'] = dataframe['hapercent'] <= limit dataframe['up'] = dataframe['hapercent'] >= limit dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'): dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean() dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", timeframe=timeframe, ema_period=window) return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '5m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \ & (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"]) # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"] # dérivée relative simple dataframe[d1_col] = 1000 * (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1) return dataframe def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe.loc[ ( (dataframe['lstm_pred'] > 0) ), ['enter_long', 'enter_tag']] = (1, f"future") dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) if self.dp.runmode.value in ('backtest'): dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather") return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # (dataframe['lstm_pred'] < 0) & (dataframe['hapercent'] < 0) # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") # dataframe.loc[ # ( # ( # ( # (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1)) # & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons']) # ) # # | (dataframe['mid_smooth_12_deriv1'] < 0) # ) # & (dataframe['sma60_future_pred_cons'] < dataframe['sma60_future_pred_cons'].shift(1)) # & (dataframe['hapercent'] < 0) # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") # # dataframe.loc[ # ( # ( # (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1)) # & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons']) # # ) # # & (dataframe['mid_future_pred_cons'] > dataframe['max12']) # & (dataframe['hapercent'] < 0) # # ), ['exit_long', 'exit_tag']] = (1, f"max12") return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) last_lost = self.getLastLost(last_candle, pair) pct_first = 0 total_counts = sum( pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC') if self.pairs[pair]['first_buy']: pct_first = self.getPctFirstBuy(pair, last_candle) pct = self.pct.value if count_of_buys == 1: pct_max = current_profit else: if self.pairs[trade.pair]['last_buy']: pct_max = self.getPctLastBuy(pair, last_candle) else: pct_max = - pct if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2: lim = - pct - (count_of_buys * self.pct_inc.value) else: pct = 0.05 lim = - pct - (count_of_buys * 0.0025) if (len(dataframe) < 1): # print("skip dataframe") return None if not self.should_enter_trade(pair, last_candle, current_time): return None condition = (last_candle['enter_long'] and last_candle['stop_buying_1h'] == False and last_candle['hapercent'] > 0) # and last_candle['sma60_deriv1'] > 0 # or last_candle['enter_tag'] == 'pct3' \ # or last_candle['enter_tag'] == 'pct3_1h' # if (self.getShortName(pair) != 'BTC' and count_of_buys > 3): # condition = before_last_candle_24['mid_smooth_3_1h'] > before_last_candle_12['mid_smooth_3_1h'] and before_last_candle_12['mid_smooth_3_1h'] < last_candle['mid_smooth_3_1h'] #and last_candle['mid_smooth_3_deriv1_1h'] < -1.5 limit_buy = 40 if (count_of_buys < limit_buy) and condition and (pct_max < lim): try: if self.pairs[pair]['has_gain'] and profit > 0: self.pairs[pair]['force_sell'] = True return None max_amount = self.config.get('stake_amount') * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle) * abs(last_lost / self.mise_factor_buy.value)) if stake_amount > 0: trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # print(df_filtered) return stake_amount return None except Exception as exception: print(exception) return None if (profit > self.pairs[pair]['previous_profit'] and profit > self.pairs[pair]['expected_profit'] and hours > 6 # and last_candle['sma60_deriv1'] > 0 and last_candle['max_rsi_12_1h'] < 75 # and last_candle['rsi_1d'] < 58 # and last_candle['stop_buying'] == False # and last_candle['mid_smooth_5_deriv1_1d'] > 0 and self.wallets.get_available_stake_amount() > 0 ): try: self.pairs[pair]['previous_profit'] = profit stake_amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=str(round(pct_max, 4)), profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None except Exception as exception: print(exception) return None return None def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # Calculer le minimum des 14 derniers jours nb_pairs = len(self.dp.current_whitelist()) base_stake_amount = self.config.get('stake_amount') / (self.mises.value) # * nb_pairs) # Montant de base configuré # factors = [1, 1.2, 1.3, 1.4] if self.pairs[pair]['count_of_buys'] == 0: factor = 1 #65 / min(65, last_candle['rsi_1d']) if last_candle['open'] < last_candle['sma5_1h'] and last_candle['mid_smooth_12_deriv1'] > 0: factor = 2 adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) else: adjusted_stake_amount = self.pairs[pair]['first_amount'] if self.pairs[pair]['count_of_buys'] == 0: self.pairs[pair]['first_amount'] = adjusted_stake_amount return adjusted_stake_amount def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if (self.getShortName(pair) == 'BTC'): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit return expected_profit def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values @property def protections(self): return [ { "method": "CooldownPeriod", "stop_duration_candles": 12 } # { # "method": "MaxDrawdown", # "lookback_period_candles": self.lookback.value, # "trade_limit": self.trade_limit.value, # "stop_duration_candles": self.protection_stop.value, # "max_allowed_drawdown": self.protection_max_allowed_dd.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": self.protection_stoploss_stop.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15, max_stake: float = 1000.0) -> float: """ Calcule la mise à allouer en fonction du drawdown. :param pct: Drawdown en pourcentage (ex: -0.12 pour -12%) :param base_stake: Mise de base (niveau 0) :param step: Espacement entre paliers (ex: tous les -4%) :param growth: Facteur de croissance par palier (ex: 1.15 pour +15%) :param max_stake: Mise maximale à ne pas dépasser :return: Montant à miser """ if pct >= 0: return base_stake level = int(abs(pct) / step) stake = base_stake * (growth ** level) return min(stake, max_stake) def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]): """ Calcule une régression polynomiale sur les `window` dernières valeurs de la série, puis prédit les `n_future` prochaines valeurs. :param series: Série pandas (ex: dataframe['close']) :param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme :param degree: Degré du polynôme (ex: 2 pour quadratique) :param n_future: Nombre de valeurs futures à prédire :return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures """ if len(series) < window: raise ValueError("La série est trop courte pour la fenêtre spécifiée.") recent_y = series.iloc[-window:].values x = np.arange(window) coeffs = np.polyfit(x, recent_y, degree) poly = np.poly1d(coeffs) x_future = np.arange(window, window + len(steps)) y_future = poly(x_future) # Affichage de la fonction # print("Fonction polynomiale trouvée :") # print(poly) current = series.iloc[-1] count = 0 for future_step in steps: # range(1, n_future + 1) future_x = window - 1 + future_step prediction = poly(future_x) # series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction # ➕ Afficher les prédictions # print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}") if prediction > 0: # current: count += 1 return poly, x_future, y_future, count def should_enter_trade(self, pair: str, last_candle, current_time) -> bool: limit = 3 # if self.pairs[pair]['stop'] and last_candle['max_rsi_12_1h'] <= 60 and last_candle['trend_class_1h'] == -1: # dispo = round(self.wallets.get_available_stake_amount()) # self.pairs[pair]['stop'] = False # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢RESTART", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=0, # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # if not pair.startswith('BTC'): dispo = round(self.wallets.get_available_stake_amount()) # if self.pairs[pair]['stop'] \ # and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] >= self.indic_deriv1_1d_p_start.value \ # and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] >= self.indic_deriv2_1d_p_start.value: # self.pairs[pair]['stop'] = False # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢RESTART", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=0, # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # else: # if self.pairs[pair]['stop'] == False \ # and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] <= self.indic_deriv1_1d_p_stop.value \ # and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] <= self.indic_deriv2_1d_p_stop.value: # self.pairs[pair]['stop'] = True # # if self.pairs[pair]['current_profit'] > 0: # # self.pairs[pair]['force_sell'] = True # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🔴STOP", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=self.pairs[pair]['current_profit'], # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # return False # if self.pairs[pair]['stop']: # return False return True # Filtrer les paires non-BTC non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')] # Compter les positions actives sur les paires non-BTC max_nb_trades = 0 total_non_btc = 0 max_pair = '' limit_amount = 250 max_amount = 0 for p in non_btc_pairs: max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys']) max_amount = max(max_amount, self.pairs[p]['total_amount']) for p in non_btc_pairs: if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit): # if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount): max_pair = p total_non_btc += self.pairs[p]['count_of_buys'] pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle) if last_candle['mid_smooth_1h_deriv1'] < -0.02: # and last_candle['mid_smooth_1h_deriv2'] > 0): return False self.should_enter_trade_count = 0 # if max_pair != pair and self.pairs[pair]['total_amount'] > 300: # return False if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit): trade = self.pairs[max_pair]['current_trade'] current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) current_time_utc = current_time.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pct_max_max = self.getPctFirstBuy(max_pair, last_candle) # print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}") return max_pair == pair or pct_max < - 0.25 or ( pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30) else: return True def setTrends(self, dataframe: DataFrame): SMOOTH_WIN=10 df = dataframe.copy() # # --- charger les données --- # df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') # --- calcul SMA14 --- # df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14) # --- pente brute --- df['slope'] = df['sma12'].diff() # --- lissage EMA --- df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean() # df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3) # --- normalisation relative --- df['slope_norm'] = 10000 * df['slope_smooth'] / df['close'] # df['slope_norm'].fillna(0, inplace=True) df['slope_norm'] = df['slope_norm'].fillna(0) dataframe['slope_norm'] = df['slope_norm'] def make_model(self, model_type="linear", degree=2, random_state=0): model_type = model_type.lower() if model_type == "linear": return LinearRegression() if model_type == "poly": return make_pipeline(StandardScaler(), PolynomialFeatures(degree=degree, include_bias=False), LinearRegression()) if model_type == "svr": return make_pipeline(StandardScaler(), SVR(kernel="rbf", C=1.0, epsilon=0.1)) if model_type == "rf": return RandomForestRegressor(n_estimators=100, random_state=random_state, n_jobs=1) if model_type == "lgbm": if not _HAS_LGBM: raise RuntimeError("lightgbm n'est pas installé") return LGBMRegressor(n_estimators=100, random_state=random_state) raise ValueError(f"model_type inconnu: {model_type}") def calculateRegressionNew(self, df, indic, lookback=20, future_steps=5, model_type="linear"): df = df.copy() pred_col = f"{indic}_future_pred_cons" df[pred_col] = np.nan X_idx = np.arange(lookback).reshape(-1, 1) values = df[indic].values n = len(values) model = LinearRegression() for i in range(lookback, n - future_steps): window = values[i - lookback:i] # cible = vraie valeur future y_target = values[i + future_steps] if np.isnan(window).any() or np.isnan(y_target): continue # entraînement model.fit(X_idx, window) # prédiction de la valeur future future_x = np.array([[lookback + future_steps - 1]]) pred_future = model.predict(future_x)[0] # la prédiction concerne i + future_steps df.iloc[i + future_steps, df.columns.get_loc(pred_col)] = pred_future return df # ========================================================== # NOUVELLE VERSION : calcule AUSSI les dernières valeurs ! # ========================================================== def calculateRegression( self, df, indic, lookback=30, future_steps=5, model_type="linear", degree=2, weight_mode="exp", weight_strength=2, clip_k=2.0, blend_alpha=0.7, ): values = df[indic].values.astype(float) n = len(values) colname = f"{indic}_future_pred_cons" df[colname] = np.nan # pré-calcul des fenêtres windows = np.lib.stride_tricks.sliding_window_view(values, lookback) # windows[k] = valeurs de [k .. k+lookback-1] # indices valides d’entraînement trainable_end = n - future_steps # créer une fois le modèle model = self.make_model(model_type=model_type, degree=degree) # ================ # BOUCLE TRAINING # ================ for i in range(lookback, trainable_end): window = values[i - lookback:i] if np.isnan(window).any(): continue # delta future réelle y_target = values[i + future_steps] - values[i] # features = positions dans la fenêtre : 0..lookback-1 X_window = np.arange(lookback).reshape(-1, 1) # sample weights if weight_mode == "exp": weights = np.linspace(0.1, 1, lookback) ** weight_strength else: weights = None # entraînement try: model.fit(X_window, window, sample_weight=weights) except Exception: model.fit(X_window, window) # prédiction de la valeur future (position lookback+future_steps-1) y_pred_value = model.predict( np.array([[lookback + future_steps - 1]]) )[0] pred_delta = y_pred_value - values[i] # clipping par volatilité locale local_std = np.std(window) max_change = clip_k * (local_std if local_std > 0 else 1e-9) pred_delta = np.clip(pred_delta, -max_change, max_change) # blend final_pred_value = ( blend_alpha * (values[i] + pred_delta) + (1 - blend_alpha) * values[i] ) df.iloc[i, df.columns.get_loc(colname)] = final_pred_value # ========================================================== # 🔥 CALCUL DES DERNIÈRES VALEURS MANQUANTES 🔥 # ========================================================== # Il reste les indices : [n - future_steps … n - 1] for i in range(trainable_end, n): # fenêtre glissante de fin if i - lookback < 0: continue window = values[i - lookback:i] if np.isnan(window).any(): continue # features X_window = np.arange(lookback).reshape(-1, 1) try: model.fit(X_window, window) except: continue # prédiction d’une continuation locale : future_steps = 1 en fin y_pred_value = model.predict(np.array([[lookback]]))[0] pred_delta = y_pred_value - values[i - 1] final_pred_value = ( blend_alpha * (values[i - 1] + pred_delta) + (1 - blend_alpha) * values[i - 1] ) df.iloc[i, df.columns.get_loc(colname)] = final_pred_value return df def kerasGenerateGraphs(self, dataframe): model = self.model self.kerasGenerateGraphModel(model) self.kerasGenerateGraphPredictions(model, dataframe, self.lookback) self.kerasGenerateGraphPoids(model) def kerasGenerateGraphModel(self, model): plot_model( model, to_file=f"{self.path}/lstm_model.png", show_shapes=True, show_layer_names=True ) def kerasGenerateGraphPredictions(self, model, dataframe, lookback): preds = self.tensorFlowGeneratePredictions(dataframe, lookback, model) # plot plt.figure(figsize=(36, 8)) plt.plot(dataframe[self.indicator_target].values, label=self.indicator_target) plt.plot(preds, label="lstm_pred") plt.legend() plt.savefig(f"{self.path}/lstm_predictions.png") plt.close() def kerasGenerateGraphPoids(self, model): for i, layer in enumerate(model.layers): weights = layer.get_weights() # liste de tableaux numpy # Sauvegarde SAFE : tableau d’objets np.save( f"{self.path}/layer_{i}_weights.npy", np.array(weights, dtype=object) ) # Exemple lecture et heatmap weights_layer0 = np.load( f"{self.path}/layer_{i}_weights.npy", allow_pickle=True ) # Choisir un poids 2D W = None for w in weights_layer0: if isinstance(w, np.ndarray) and w.ndim == 2: W = w break if W is None: print(f"Aucune matrice 2D dans layer {i} (rien à afficher).") return plt.figure(figsize=(8, 6)) sns.heatmap(W, cmap="viridis") plt.title(f"Poids 2D du layer {i}") plt.savefig(f"{self.path}/layer{i}_weights.png") plt.close() # ------------------- # Entraînement # ------------------- def tensorFlowTrain(self, dataframe, future_steps=1, lookback=50, epochs=40, batch_size=32): X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback) # 6) Modèle LSTM self.model = Sequential([ LSTM(64, return_sequences=False, input_shape=(lookback, X_seq.shape[2])), Dense(32, activation="relu"), Dense(1) ]) self.model.compile(loss='mse', optimizer=Adam(learning_rate=1e-4)) self.model.fit(X_seq, y_seq, epochs=epochs, batch_size=batch_size, verbose=1) # 7) Sauvegarde self.model.save(f"{self.path}/lstm_model.keras") # joblib.dump(self.scaler_X, f"{self.path}/lstm_scaler_X.pkl") # joblib.dump(self.scaler_y, f"{self.path}/lstm_scaler_y.pkl") def tensorFlowPrepareDataFrame(self, dataframe, future_steps, lookback): target = self.indicator_target # 1) Détecter NaN / Inf et nettoyer feature_columns = self.model_indicators # [col for col in dataframe.columns if col != target] df = dataframe.copy() df.replace([np.inf, -np.inf], np.nan, inplace=True) df.dropna(subset=feature_columns + [target], inplace=True) # 2) Séparer features et cible X_values = df[feature_columns].values y_values = df[target].values.reshape(-1, 1) # 3) Gestion colonnes constantes (éviter division par zéro) for i in range(X_values.shape[1]): if X_values[:, i].max() == X_values[:, i].min(): X_values[:, i] = 0.0 if y_values.max() == y_values.min(): y_values[:] = 0.0 # 4) Normalisation self.scaler_X = MinMaxScaler() X_scaled = self.scaler_X.fit_transform(X_values) if self.y_no_scale: y_scaled = y_values else: self.scaler_y = MinMaxScaler() y_scaled = self.scaler_y.fit_transform(y_values) # 5) Création des fenêtres glissantes X_seq = [] y_seq = [] for i in range(len(X_scaled) - lookback - future_steps): X_seq.append(X_scaled[i:i + lookback]) y_seq.append(y_scaled[i + lookback + future_steps]) X_seq = np.array(X_seq) y_seq = np.array(y_seq) # Vérification finale if np.isnan(X_seq).any() or np.isnan(y_seq).any(): raise ValueError("X_seq ou y_seq contient encore des NaN") if np.isinf(X_seq).any() or np.isinf(y_seq).any(): raise ValueError("X_seq ou y_seq contient encore des Inf") return X_seq, y_seq # ------------------- # Prédiction # ------------------- def tensorFlowPredict(self, dataframe, future_steps=1, lookback=50): feature_columns = self.model_indicators #[col for col in dataframe.columns if col != self.indicator_target] # charger le modèle si pas déjà chargé if self.model is None: self.model = load_model(f"{self.path}/lstm_model.keras", compile=False) # self.scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl") # self.scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl") X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback) preds = self.tensorFlowGeneratePredictions(dataframe, lookback, self.model) dataframe["lstm_pred"] = preds dataframe["lstm_pred_deriv1"] = dataframe["lstm_pred"].diff() return dataframe def tensorFlowGeneratePredictions(self, dataframe, lookback, model): # features = toutes les colonnes sauf la cible feature_columns = self.model_indicators # [col for col in dataframe.columns if col != self.indicator_target] X_values = dataframe[feature_columns].values # normalisation (avec le scaler utilisé à l'entraînement) X_scaled = self.scaler_X.transform(X_values) # créer les séquences glissantes X_seq = [] for i in range(len(X_scaled) - lookback): X_seq.append(X_scaled[i:i + lookback]) X_seq = np.array(X_seq) # prédictions y_pred_scaled = model.predict(X_seq, verbose=0).flatten() if self.y_no_scale: y_pred = y_pred_scaled else: y_pred = self.scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() # alignement avec les données preds = [np.nan] * len(dataframe) start = lookback end = start + len(y_pred) # preds[start:end] = y_pred[:end - start] preds[start:start + len(y_pred)] = y_pred return preds