# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np import os import json import csv from pandas import DataFrame from typing import Optional, Union, Tuple import math import logging from pathlib import Path # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from datetime import timezone, timedelta logger = logging.getLogger(__name__) # Machine Learning from sklearn.model_selection import train_test_split import joblib import matplotlib.pyplot as plt from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve, precision_score, recall_score, precision_recall_curve, f1_score, mean_squared_error, r2_score ) from sklearn.tree import export_text import inspect from sklearn.feature_selection import SelectFromModel from tabulate import tabulate from sklearn.feature_selection import VarianceThreshold import seaborn as sns import lightgbm as lgb from sklearn.model_selection import cross_val_score import optuna.visualization as vis import optuna from lightgbm import LGBMRegressor from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression, Ridge, HuberRegressor from sklearn.preprocessing import StandardScaler, PolynomialFeatures from sklearn.pipeline import make_pipeline from sklearn.svm import SVR from sklearn.ensemble import RandomForestRegressor from sklearn.ensemble import GradientBoostingRegressor from sklearn.preprocessing import StandardScaler from sklearn.ensemble import HistGradientBoostingRegressor from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline # Tensorflow import pandas as pd import numpy as np import tensorflow as tf from tensorflow.keras import layers, models from tensorflow.keras.models import load_model from keras.utils import plot_model from keras.models import Sequential from keras.layers import LSTM, Dense from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense from tensorflow.keras.optimizers import Adam from sklearn.metrics import mean_absolute_error, mean_squared_error from sklearn.preprocessing import MinMaxScaler import tensorflow as tf from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # désactive complètement le GPU os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices=false" # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" import warnings warnings.filterwarnings( "ignore", message=r".*No further splits with positive gain.*" ) def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df class Zeus_TensorFlow_1h(IStrategy): startup_candle_count = 60 * 24 # Machine Learning model = None model_indicators = [] indicator_target = 'sma5' # Tensorflow lookback = 72 future_steps = 6 y_no_scale = False epochs = 40 batch_size = 64 scaler_X = None scaler_y = None use_mc_dropout = True mc_samples = 40 minimal_pct_for_trade = 0.003 # 1.5% seuil (MAPE-based deadzone) min_hit_ratio = 0.55 # seuil minimal historique pour activer trading max_uncertainty_pct = 0.7 # si incertitude > 70% de predicted move => skip base_risk_per_trade = 0.1 # 1% du capital (pour sizing) _tf_model = None _scaler_X = None _scaler_y = None # internal _ps_model = None _ps_scaler_X = None _ps_scaler_y = None path = f"user_data/plots/" model_path = "position_sizer_lstm.keras" scaler_X_path = "position_sizer_scaler_X.pkl" scaler_y_path = "position_sizer_scaler_y.pkl" # ROI table: minimal_roi = { "0": 0.564, "567": 0.273, "2814": 0.12, "7675": 0 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = False trailing_stop = False trailing_stop_positive = 0.15 trailing_stop_positive_offset = 0.20 trailing_only_offset_is_reached = True # Buy hypers timeframe = '1h' max_open_trades = 5 max_amount = 40 parameters = {} # DCA config position_adjustment_enable = True plot_config = { "main_plot": { "sma24": { "color": "pink" }, "sma5_1d": { "color": "blue" }, # "sma24": { # "color": "yellow" # }, "sma60": { "color": "green" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", }, # "sma12": { # "color": "blue" # }, "mid_smooth_3": { "color": "blue" } }, "subplots": { "Rsi": { "max_rsi_24": { "color": "blue" }, "max_rsi_24": { "color": "pink" }, # "rsi": { # "color": "red" # }, # "rsi_1d": { # "color": "blue" # } }, "Rsi_deriv1": { "sma24_deriv1": { "color": "pink" }, "sma24_deriv1": { "color": "yellow" }, "sma5_deriv1_1d": { "color": "blue" }, "sma60_deriv1": { "color": "green" } }, "Rsi_deriv2": { "sma24_deriv2": { "color": "pink" }, "sma24_deriv2": { "color": "yellow" }, "sma5_deriv2_1d": { "color": "blue" }, "sma60_deriv2": { "color": "green" } }, 'Macd': { "macd_rel_1d": { "color": "cyan" }, "macdsignal_rel_1d": { "color": "pink" }, "macdhist_rel_1d": { "color": "yellow" } } } } columns_logged = False pairs = { pair: { "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 # factors = [1, 1.1, 1.25, 1.5, 2.0, 3] # thresholds = [2, 5, 10, 20, 30, 50] factors = [0.5, 0.75, 1, 1.25, 1.5, 2] thresholds = [0, 2, 5, 10, 30, 45] trades = list() max_profit_pairs = {} mise_factor_buy = DecimalParameter(0.01, 0.1, default=0.05, decimals=2, space='buy', optimize=True, load=True) indicators = {'sma5', 'sma12', 'sma24', 'sma60'} indicators_percent = {'percent', 'percent3', 'percent12', 'percent24', 'percent_1d', 'percent3_1h', 'percent12_1d', 'percent24_1d'} mises = IntParameter(1, 50, default=5, space='buy', optimize=False, load=True) ml_prob_buy = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='buy', optimize=True, load=True) # ml_prob_sell = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='sell', optimize=True, load=True) pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=True, load=True) pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True) # rsi_deb_protect = IntParameter(50, 90, default=70, space='protection', optimize=True, load=True) # rsi_end_protect = IntParameter(20, 60, default=55, space='protection', optimize=True, load=True) # # sma24_deriv1_deb_protect = DecimalParameter(-4, 4, default=-2, decimals=1, space='protection', optimize=True, load=True) # sma24_deriv1_end_protect = DecimalParameter(-4, 4, default=0, decimals=1, space='protection', optimize=True, load=True) # ========================================================================= should_enter_trade_count = 0 def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0)) profit =trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['previous_profit'] = 0 self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None # else: # self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_2 = dataframe.iloc[-3].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) max_profit = self.pairs[pair]['max_profit'] baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 if hours % 4 == 0: self.log_trade( last_candle=last_candle, date=current_time, action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else " dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=round(profit, 2), buys=count_of_buys, stake=0 ) # if (last_candle['predicted_pct'] > 0): # return None pair_name = self.getShortName(pair) if last_candle['max_rsi_24'] > 85 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0.05: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Rsi85_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if self.pairs[pair]['force_sell']: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Frc_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if profit > 0 and baisse > 0.30: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # if max_profit > 0.5 * count_of_buys and baisse > 0.15: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if (last_candle['sma5'] - before_last_candle_12['sma5']) / last_candle['sma5'] > 0.0002: return None factor = 1 if (self.getShortName(pair) == 'BTC'): factor = 0.5 # if baisse > 2 and baisse > factor * self.pairs[pair]['total_amount'] / 100: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return 'Baisse_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) # # if 1 <= count_of_buys <= 3: if last_candle['max_rsi_24'] > 75 and profit > expected_profit and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0: self.pairs[pair]['force_sell'] = False return str(count_of_buys) + '_' + 'Rsi75_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '') def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] # informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] print(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = '' color = GREEN if profit > 0 else RED color_sma24 = GREEN if last_candle['sma24_deriv1'] > 0 else RED color_sma24_2 = GREEN if last_candle['sma24_deriv2'] > 0 else RED color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1'] > 0 else RED color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2'] > 0 else RED color_sma5 = GREEN if last_candle['sma60_deriv1'] > 0 else RED color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED color_smooth = GREEN if last_candle['mid_smooth_deriv1'] > 0 else RED color_smooth2 = GREEN if last_candle['mid_smooth_deriv2'] > 0 else RED last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" # f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|" # f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|" f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|" f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1'], 2):>5}{RESET}" f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2'], 2):>5}{RESET}" f"|{color_sma5}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}" f"|{color_smooth}{round(last_candle['mid_smooth_deriv1'], 2):>5}{RESET}|{color_smooth2}{round(last_candle['mid_smooth_deriv2'], 2):>5}{RESET}" ) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] short_pair = self.getShortName(pair) self.path = f"user_data/plots/{short_pair}/" + ("valide/" if not self.dp.runmode.value in ('backtest') else '') dataframe = self.populateDataframe(dataframe, timeframe='1h') # ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative = self.populateDataframe(informative, timeframe='1d') # informative = self.calculateRegression(informative, 'mid', lookback=15) dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price self.pairs[pair]['first_buy'] = buy.price self.pairs[pair]['first_amount'] = buy.price * buy.filled # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_buy'] = buy.price count = count + 1 amount += buy.price * buy.filled # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count # dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) self.pairs[pair]['total_amount'] = amount # dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24']) # =============================== # lissage des valeurs horaires dataframe['mid_smooth'] = dataframe['mid'].rolling(window=6).mean() dataframe["mid_smooth_deriv1"] = 100 * dataframe["mid_smooth"].diff().rolling(window=6).mean() / \ dataframe['mid_smooth'] dataframe["mid_smooth_deriv2"] = 100 * dataframe["mid_smooth_deriv1"].diff().rolling(window=6).mean() # dataframe['mid_smooth_5h'] = talib.EMA(dataframe, timeperiod=60) # dataframe['mid'].rolling(window=60).mean() # dataframe["mid_smooth_5h_deriv1"] = 100 * dataframe["mid_smooth_5h"].diff().rolling(window=60).mean() / \ # dataframe['mid_smooth_5h'] # dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean() # dataframe['stop_buying_deb'] = ((dataframe['max_rsi_24'] > self.rsi_deb_protect.value) # & (dataframe['sma24_deriv1'] < self.sma24_deriv1_deb_protect.value) # ) # dataframe['stop_buying_end'] = ((dataframe['max_rsi_24'] < self.rsi_end_protect.value) # & (dataframe['sma24_deriv1'] > self.sma24_deriv1_end_protect.value) # ) # # latched = np.zeros(len(dataframe), dtype=bool) # # for i in range(1, len(dataframe)): # if dataframe['stop_buying_deb'].iloc[i]: # latched[i] = True # elif dataframe['stop_buying_end'].iloc[i]: # latched[i] = False # else: # latched[i] = latched[i - 1] # # dataframe['stop_buying'] = latched dataframe = self.calculateRegression(dataframe, 'mid', lookback=10, future_steps=10, model_type="poly") dataframe = self.calculateRegression(dataframe, 'sma24', lookback=12, future_steps=12) # TENSOR FLOW if self.dp.runmode.value in ('backtest'): self.model_indicators = self.listUsableColumns(dataframe) self.tensorFlowTrain(dataframe, future_steps = self.future_steps) self.tensorFlowPredict(dataframe) self.kerasGenerateGraphs(dataframe) # Lire les colonnes with open(f"{self.path}/model_metadata.json", "r") as f: metadata = json.load(f) self.model_indicators = metadata["feature_columns"] self.lookback = metadata["lookback"] self.future_steps = metadata["future_steps"] # ex: feature_columns correspond aux colonnes utilisées à l'entraînement # feature_columns = [c for c in dataframe.columns if c not in [self.indicator_target, 'lstm_pred']] preds, preds_std = self.predict_on_dataframe(dataframe, self.model_indicators) dataframe["lstm_pred"] = preds dataframe["lstm_pred_std"] = preds_std # predicted % change relative to current price dataframe["predicted_pct"] = (dataframe["lstm_pred"] - dataframe[self.indicator_target]) / dataframe[ self.indicator_target] # confidence score inversely related to std (optionnel) dataframe["pred_confidence"] = 1 / (1 + dataframe["lstm_pred_std"]) # crude; scale to [0..1] if needed # # ---- Charger ou prédire ---- # try: # if self.dp.runmode.value in ('backtest'): # self.train_position_sizer(dataframe, feature_columns=self.model_indicators) # # preds_positions = self.predict_position_fraction_on_dataframe(dataframe, feature_columns=self.model_indicators) # # ---- Ajouter la colonne des fractions ---- # dataframe["pos_frac"] = preds_positions # valeurs entre 0..1 # # Exemple : valeur correspond à l’allocation conseillée du LSTM # # except Exception as e: # print(f"[LSTM Position] Erreur prediction: {e}") # dataframe["pos_frac"] = np.full(len(dataframe), np.nan) return dataframe def listUsableColumns(self, dataframe): # Étape 1 : sélectionner numériques numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # Étape 2 : enlever constantes # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and (c.endswith("_deriv1") or not c.endswith("deriv1")) # and not c.endswith("_count") # ] usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 and not c.endswith("_state") # and not c.endswith("_1d") # and not c.endswith("") and not c.endswith("_count") # and not c.startswith("open") and not c.startswith("close") # and not c.startswith("low") and not c.startswith("high") # and not c.startswith("haopen") and not c.startswith("haclose") # and not c.startswith("bb_lower") and not c.startswith("bb_upper") # and not c.startswith("bb_middle") and not c.endswith("_class") and not c.endswith("_price") and not c.startswith('stop_buying')] # Étape 3 : remplacer inf et NaN par 0 # usable_cols = [ # "obv_1d", "min60", "mid_future_pred_cons", "bb_upperband", # "bb_lowerband", "open", "max60", "high", "volume_1d", # "mid_smooth_5_1d", "haclose", "high_1d", "sma24_future_pred_cons", # "volume_deriv2", "mid_smooth", "volume_deriv2_1d", "bb_middleband", # "volume_deriv1", "sma60", "volume_dist", "open_1d", # "haopen", "mid_1d", "min12_1d", "volume_deriv1_1d", # "max12_1d", "mid_smooth_12", "sma24", "bb_middleband_1d", "sma12_1d", # ] dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) print("Colonnes utilisables pour le modèle :") print(usable_cols) self.model_indicators = usable_cols model_metadata = { "feature_columns": self.model_indicators, "lookback": self.lookback, "future_steps": self.future_steps, } with open(f"{self.path}/model_metadata.json", "w") as f: json.dump(model_metadata, f) return self.model_indicators def populateDataframe(self, dataframe, timeframe='5m'): dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe["percent"] = dataframe['mid'].pct_change() dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean() dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean() dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean() # if self.dp.runmode.value in ('backtest'): # dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close'] dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean() self.calculeDerivees(dataframe, 'sma5', timeframe=timeframe, ema_period=5) dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() #dataframe["mid"].rolling(window=12).mean() self.calculeDerivees(dataframe, 'sma12', timeframe=timeframe, ema_period=12) dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() #dataframe["mid"].rolling(window=24).mean() self.calculeDerivees(dataframe, 'sma24', timeframe=timeframe, ema_period=24) dataframe['sma48'] = dataframe['mid'].ewm(span=48, adjust=False).mean() #dataframe["mid"].rolling(window=48).mean() self.calculeDerivees(dataframe, 'sma48', timeframe=timeframe, ema_period=48) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() #dataframe["mid"].rolling(window=60).mean() self.calculeDerivees(dataframe, 'sma60', timeframe=timeframe, ema_period=60) dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", timeframe=timeframe) # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) self.calculeDerivees(dataframe, 'rsi', timeframe=timeframe, ema_period=12) dataframe['max12'] = talib.MAX(dataframe['mid'], timeperiod=12) dataframe['min12'] = talib.MIN(dataframe['mid'], timeperiod=12) dataframe['max60'] = talib.MAX(dataframe['mid'], timeperiod=60) dataframe['min60'] = talib.MIN(dataframe['mid'], timeperiod=60) dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['mid']) / dataframe['min60']) # dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36) # dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36) # dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36'] # dataframe['maxpct36'] = talib.MAX(dataframe['pct36'], timeperiod=36) # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["mid"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma5"] # dataframe["bb_width"] = ( # (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"] # ) # Calcul MACD macd, macdsignal, macdhist = talib.MACD( dataframe['mid'], fastperiod=12, slowperiod=26, signalperiod=9 ) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le dataframe dataframe['macd'] = macd dataframe['macdsignal'] = macdsignal dataframe['macdhist'] = macdhist # Regarde dans le futur # # --- Rendre relatif sur chaque série (-1 → 1) --- # for col in ['macd', 'macdsignal', 'macdhist']: # series = dataframe[col] # valid = series[~np.isnan(series)] # ignorer NaN # min_val = valid.min() # max_val = valid.max() # span = max_val - min_val if max_val != min_val else 1 # dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1 # # dataframe['tdc_macd'] = self.macd_tendance_int( # dataframe, # macd_col='macd_rel', # signal_col='macdsignal_rel', # hist_col='macdhist_rel' # ) # --- pente brute --- dataframe['slope'] = dataframe['sma24'].diff() # --- lissage EMA --- dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean() # --- Volatilité normalisée --- dataframe['atr'] = ta.volatility.AverageTrueRange( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).average_true_range() dataframe['atr_norm'] = dataframe['atr'] / dataframe['close'] # --- Force de tendance --- dataframe['adx'] = ta.trend.ADXIndicator( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).adx() # --- Volume directionnel (On Balance Volume) --- dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(close=dataframe['mid'], volume=dataframe['volume']).on_balance_volume() # --- Volatilité récente (écart-type des rendements) --- dataframe['vol_24'] = dataframe['percent'].rolling(24).std() # Compter les baisses / hausses consécutives self.calculateDownAndUp(dataframe, limit=0.0001) # --- Filtrage des NaN initiaux --- # dataframe = dataframe.dropna() dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width'] dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3) dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9) dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0 ########################### dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean()) self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12) ############################# # NOUVEAUX """ Ajout des indicateurs avancés (fractals, stoch, mfi, entropy, hurst, donchian, keltner, vwap, wick features, etc.). Ces indicateurs sont concus pour enrichir les entrees du modele TensorFlow. """ dataframe = dataframe.copy() # ----------------------------------------------------------- # 1) Fractals (Bill Williams) # Fractal haut : point haut local centré (2-3-2) # Fractal bas : point bas local centré (2-3-2) # ----------------------------------------------------------- dataframe["fractals_up"] = ( (dataframe["high"].shift(2) < dataframe["high"].shift(1)) & (dataframe["high"].shift(0) < dataframe["high"].shift(1)) & (dataframe["high"].shift(3) < dataframe["high"].shift(1)) & (dataframe["high"].shift(4) < dataframe["high"].shift(1)) ).astype(int) dataframe["fractals_down"] = ( (dataframe["low"].shift(2) > dataframe["low"].shift(1)) & (dataframe["low"].shift(0) > dataframe["low"].shift(1)) & (dataframe["low"].shift(3) > dataframe["low"].shift(1)) & (dataframe["low"].shift(4) > dataframe["low"].shift(1)) ).astype(int) # ----------------------------------------------------------- # 2) Stochastic Oscillator (K, D) # Capture l'epuisement du mouvement et les extremums de momentum # ----------------------------------------------------------- stoch_k = talib.STOCH(dataframe["high"], dataframe["low"], dataframe["close"])[0] stoch_d = talib.STOCH(dataframe["high"], dataframe["low"], dataframe["close"])[1] dataframe["stoch_k"] = stoch_k dataframe["stoch_d"] = stoch_d dataframe["stoch_k_d_diff"] = stoch_k - stoch_d # ----------------------------------------------------------- # 3) MFI (Money Flow Index) # Combine prix + volume, excellent pour anticiper les retournements # ----------------------------------------------------------- dataframe["mfi"] = talib.MFI( dataframe["high"], dataframe["low"], dataframe["close"], dataframe["volume"], timeperiod=14 ) dataframe["mfi_deriv1"] = dataframe["mfi"].diff() # ----------------------------------------------------------- # 4) VWAP (Volume-Weighted Average Price) # Zone d'equilibre du prix ; tres utile pour le sizing et l'analyse structurelle # ----------------------------------------------------------- typical_price = (dataframe["high"] + dataframe["low"] + dataframe["close"]) / 3 dataframe["vwap"] = (typical_price * dataframe["volume"]).cumsum() / dataframe["volume"].replace(0, np.nan).cumsum() dataframe["close_vwap_dist"] = dataframe["close"] / dataframe["vwap"] - 1 # ----------------------------------------------------------- # 5) Donchian Channels # Basés sur les extremes haut/bas, utiles pour breakout et volatilite # ----------------------------------------------------------- dataframe["donchian_high"] = dataframe["high"].rolling(24).max() dataframe["donchian_low"] = dataframe["low"].rolling(24).min() dataframe["donchian_width"] = (dataframe["donchian_high"] - dataframe["donchian_low"]) / dataframe["close"] dataframe["donchian_percent"] = (dataframe["close"] - dataframe["donchian_low"]) / ( dataframe["donchian_high"] - dataframe["donchian_low"]) # ----------------------------------------------------------- # 6) Keltner Channels # Combine volatilite (ATR) + moyenne mobile ; tres stable et utile pour ML # ----------------------------------------------------------- atr = talib.ATR(dataframe["high"], dataframe["low"], dataframe["close"], timeperiod=20) ema20 = talib.EMA(dataframe["close"], timeperiod=20) dataframe["kc_upper"] = ema20 + 2 * atr dataframe["kc_lower"] = ema20 - 2 * atr dataframe["kc_width"] = (dataframe["kc_upper"] - dataframe["kc_lower"]) / dataframe["close"] # ----------------------------------------------------------- # 7) Wick Features # Encode la forme de la bougie (haut, bas, corps) — tres utile en DL # ----------------------------------------------------------- dataframe["body"] = abs(dataframe["close"] - dataframe["open"]) dataframe["range"] = dataframe["high"] - dataframe["low"] dataframe["body_pct"] = dataframe["body"] / dataframe["range"].replace(0, np.nan) dataframe["upper_wick_pct"] = (dataframe["high"] - dataframe[["close", "open"]].max(axis=1)) / dataframe[ "range"].replace(0, np.nan) dataframe["lower_wick_pct"] = (dataframe[["close", "open"]].min(axis=1) - dataframe["low"]) / dataframe[ "range"].replace(0, np.nan) # ----------------------------------------------------------- # 8) Shannon Entropy (sur les variations de prix) # Mesure le degre d'ordre / chaos ; excellent pour sizing adaptatif # ----------------------------------------------------------- def rolling_entropy(series, window): eps = 1e-12 roll = series.rolling(window) return - (roll.apply(lambda x: np.sum((x / (np.sum(abs(x)) + eps)) * np.log((x / (np.sum(abs(x)) + eps)) + eps)), raw=False)) dataframe["entropy_24"] = rolling_entropy(dataframe["close"].pct_change(), 24) # ----------------------------------------------------------- # 9) Hurst Exponent (tendance vs mean reversion) # Indique si le marche est trending (>0.5) ou mean-reverting (<0.5) # ----------------------------------------------------------- def hurst_exponent(ts): if len(ts) < 40: return np.nan lags = range(2, 20) tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags] poly = np.polyfit(np.log(lags), np.log(tau), 1) return poly[0] * 2.0 dataframe["hurst_48"] = dataframe["close"].rolling(48).apply(hurst_exponent, raw=False) # Nettoyage final dataframe.replace([np.inf, -np.inf], np.nan, inplace=True) dataframe.fillna(method="ffill", inplace=True) dataframe.fillna(method="bfill", inplace=True) # FIN NOUVEAUX ############################ self.setTrends(dataframe) return dataframe def feature_auc_scores(self, X, y): aucs = {} for col in X.columns: try: aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0)) except Exception: aucs[col] = np.nan return pd.Series(aucs).sort_values(ascending=False) def macd_tendance_int(self, dataframe: pd.DataFrame, macd_col='macd', signal_col='macdsignal', hist_col='macdhist', eps=0.0) -> pd.Series: """ Renvoie la tendance MACD sous forme d'entiers. 2 : Haussier 1 : Ralentissement hausse 0 : Neutre -1 : Ralentissement baisse -2 : Baissier """ # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # | Situation | MACD | Signal | Hist | Interprétation | # | -------------------------- | ---------- | --------- | -------- | ------------------------------------------ | # | MACD > 0, Hist croissant | au-dessus | croissant | Haussier | Momentum fort → tendance haussière | # | MACD > 0, Hist décroissant | au-dessus | en baisse | Momentum | La hausse ralentit, prudence | # | MACD < 0, Hist décroissant | en dessous | en baisse | Baissier | Momentum fort → tendance baissière | # | MACD < 0, Hist croissant | en dessous | en hausse | Rebond ? | La baisse ralentit → possible retournement | # Créer une série de 0 par défaut tendance = pd.Series(0, index=dataframe.index) # Cas MACD > signal mask_up = dataframe[macd_col] > dataframe[signal_col] + eps mask_up_hist_pos = mask_up & (dataframe[hist_col] > 0) mask_up_hist_neg = mask_up & (dataframe[hist_col] <= 0) tendance[mask_up_hist_pos] = 2 # Haussier tendance[mask_up_hist_neg] = 1 # Ralentissement hausse # Cas MACD < signal mask_down = dataframe[macd_col] < dataframe[signal_col] - eps mask_down_hist_neg = mask_down & (dataframe[hist_col] < 0) mask_down_hist_pos = mask_down & (dataframe[hist_col] >= 0) tendance[mask_down_hist_neg] = -2 # Baissier tendance[mask_down_hist_pos] = -1 # Ralentissement baisse # Les NaN deviennent neutre tendance[dataframe[[macd_col, signal_col, hist_col]].isna().any(axis=1)] = 0 return tendance def calculateDownAndUp(self, dataframe, limit=0.0001): dataframe['down'] = dataframe['hapercent'] <= limit dataframe['up'] = dataframe['hapercent'] >= limit dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'): dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean() dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", timeframe=timeframe, ema_period=window) return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '5m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \ & (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"]) # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"] # dérivée relative simple dataframe[d1_col] = 1000 * (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1) dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1) return dataframe def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # qtpylib.crossed_above(dataframe['lstm_pred'], dataframe['mid']) # ), ['enter_long', 'enter_tag']] = (1, f"future") # # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) # # if self.dp.runmode.value in ('backtest'): # dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather") dataframe.loc[:, "enter_long"] = False dataframe.loc[:, "enter_short"] = False # thresholds pct_thr = self.minimal_pct_for_trade conf_thr = self.min_hit_ratio # you may want separate param # simple directional rule with deadzone and uncertainty filter mask_up = (dataframe["predicted_pct"] > pct_thr) & (dataframe["pred_confidence"] > 0) # further filters below mask_down = (dataframe["predicted_pct"] < -pct_thr) & (dataframe["pred_confidence"] > 0) # filter: ensure uncertainty isn't huge relative to predicted move # if std > |predicted_move| * max_uncertainty_pct => skip safe_up = mask_up & (dataframe["lstm_pred_std"] <= ( dataframe["predicted_pct"].abs() * dataframe[self.indicator_target] * self.max_uncertainty_pct)) safe_down = mask_down & (dataframe["lstm_pred_std"] <= ( dataframe["predicted_pct"].abs() * dataframe[self.indicator_target] * self.max_uncertainty_pct)) dataframe.loc[safe_up, ['enter_long', 'enter_tag']] = (True, f"future") # dataframe.loc[safe_down, "enter_short"] = True return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # qtpylib.crossed_below(dataframe['lstm_pred'], dataframe['mid']) # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") # dataframe.loc[ # ( # ( # ( # (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1)) # & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons']) # ) # # | (dataframe['mid_smooth_12_deriv1'] < 0) # ) # & (dataframe['sma60_future_pred_cons'] < dataframe['sma60_future_pred_cons'].shift(1)) # & (dataframe['hapercent'] < 0) # ), ['exit_long', 'exit_tag']] = (1, f"sma60_future") # # dataframe.loc[ # ( # ( # (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1)) # & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons']) # # ) # # & (dataframe['mid_future_pred_cons'] > dataframe['max12']) # & (dataframe['hapercent'] < 0) # # ), ['exit_long', 'exit_tag']] = (1, f"max12") return dataframe # # Position sizing using simplified Kelly-like fraction # def adjust_trade_positionNew(self, trade: Trade, current_time: datetime, # current_rate: float, current_profit: float, min_stake: float, # max_stake: float, **kwargs): # """ # Return fraction in (0..1] of available position to allocate. # Uses predicted confidence and historical hit ratio. # """ # # idx = trade.open_dt_index if hasattr(trade, "open_dt_index") else trade.open_index # # fallback: use latest row # dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) # # last_candle = self.dp.get_pair_dataframe(pair).iloc[-1] # last_candle = dataframe.iloc[-1].squeeze() # hit = getattr(self, "historical_hit_ratio", 0.6) # you can compute this offline # pred_conf = last_candle.get("pred_confidence", 0.5) # predicted_pct = last_candle.get("predicted_pct", 0.0) # # # base fraction from hit ratio (simple linear mapping) # if hit <= 0.5: # base_frac = 0.01 # else: # base_frac = min((hit - 0.5) * 2.0, 1.0) # hit 0.6 -> 0.2 ; 0.75 -> 0.5 # # # scale by confidence and predicted move magnitude # scale = pred_conf * min(abs(predicted_pct) / max(self.minimal_pct_for_trade, 1e-6), 1.0) # # fraction = base_frac * scale # # # clamp # fraction = float(np.clip(fraction, 0.001, 1.0)) # return fraction def adjust_trade_positionOld(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) last_lost = self.getLastLost(last_candle, pair) pct_first = 0 total_counts = sum( pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC') if self.pairs[pair]['first_buy']: pct_first = self.getPctFirstBuy(pair, last_candle) pct = self.pct.value if count_of_buys == 1: pct_max = current_profit else: if self.pairs[trade.pair]['last_buy']: pct_max = self.getPctLastBuy(pair, last_candle) else: pct_max = - pct if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2: lim = - pct - (count_of_buys * self.pct_inc.value) else: pct = 0.05 lim = - pct - (count_of_buys * 0.0025) if (len(dataframe) < 1): # print("skip dataframe") return None if not self.should_enter_trade(pair, last_candle, current_time): return None condition = (last_candle['enter_long'] and last_candle['hapercent'] > 0) #and last_candle['stop_buying'] == False # and last_candle['sma60_deriv1'] > 0 # or last_candle['enter_tag'] == 'pct3' \ # or last_candle['enter_tag'] == 'pct3' # if (self.getShortName(pair) != 'BTC' and count_of_buys > 3): # condition = before_last_candle_24['mid_smooth_3'] > before_last_candle_12['mid_smooth_3'] and before_last_candle_12['mid_smooth_3'] < last_candle['mid_smooth_3'] #and last_candle['mid_smooth_3_deriv1'] < -1.5 limit_buy = 40 if (count_of_buys < limit_buy) and condition and (pct_max < lim): try: if self.pairs[pair]['has_gain'] and profit > 0: self.pairs[pair]['force_sell'] = True return None max_amount = self.config.get('stake_amount') * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle) * abs(last_lost / self.mise_factor_buy.value)) if stake_amount > 0: trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # print(df_filtered) return stake_amount return None except Exception as exception: print(exception) return None if (profit > self.pairs[pair]['previous_profit'] and profit > self.pairs[pair]['expected_profit'] and hours > 6 # and last_candle['sma60_deriv1'] > 0 and last_candle['max_rsi_12'] < 75 # and last_candle['rsi_1d'] < 58 # and last_candle['stop_buying'] == False # and last_candle['mid_smooth_5_deriv1_1d'] > 0 and self.wallets.get_available_stake_amount() > 0 ): try: self.pairs[pair]['previous_profit'] = profit stake_amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=str(round(pct_max, 4)), profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None except Exception as exception: print(exception) return None return None def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) # def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # base_stake_amount = self.config.get('stake_amount') # # # Récupérer le dataframe de la paire # try: # df = self.dp.get_pair_dataframe(pair) # except Exception: # return 0.1 # fallback safe size 10% # # # Doit exister car rempli dans populate_indicators # if "pos_frac" not in df.columns: # return 0.1 # # # On prend la dernière valeur non-nan # last = df["pos_frac"].dropna() # if last.empty: # return 0.1 # # raw_frac = float(last.iloc[-1]) # dans [0..1] # # # --- Sécurisation --- # # Clamp dans des limites # raw_frac = max(0.0, min(raw_frac, 1.0)) # # # Conversion vers fraction réelle autorisée # # min=0.1%, max=10% du portefeuille (change si tu veux) # min_frac = 0.05 # max_frac = 0.25 # final_frac = min_frac + raw_frac * (max_frac - min_frac) # # return base_stake_amount * final_frac def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # Calculer le minimum des 14 derniers jours nb_pairs = len(self.dp.current_whitelist()) base_stake_amount = self.config.get('stake_amount') / (self.mises.value) # * nb_pairs) # Montant de base configuré # factors = [1, 1.2, 1.3, 1.4] if self.pairs[pair]['count_of_buys'] == 0: factor = 1 #65 / min(65, last_candle['rsi_1d']) # if last_candle['min_max_60'] > 0.04: # factor = 2 adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) else: adjusted_stake_amount = self.pairs[pair]['first_amount'] if self.pairs[pair]['count_of_buys'] == 0: self.pairs[pair]['first_amount'] = adjusted_stake_amount return adjusted_stake_amount def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if (self.getShortName(pair) == 'BTC'): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit return expected_profit def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values @property def protections(self): return [ { "method": "CooldownPeriod", "stop_duration_candles": 12 } # { # "method": "MaxDrawdown", # "lookback_period_candles": self.lookback.value, # "trade_limit": self.trade_limit.value, # "stop_duration_candles": self.protection_stop.value, # "max_allowed_drawdown": self.protection_max_allowed_dd.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": self.protection_stoploss_stop.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15, max_stake: float = 1000.0) -> float: """ Calcule la mise à allouer en fonction du drawdown. :param pct: Drawdown en pourcentage (ex: -0.12 pour -12%) :param base_stake: Mise de base (niveau 0) :param step: Espacement entre paliers (ex: tous les -4%) :param growth: Facteur de croissance par palier (ex: 1.15 pour +15%) :param max_stake: Mise maximale à ne pas dépasser :return: Montant à miser """ if pct >= 0: return base_stake level = int(abs(pct) / step) stake = base_stake * (growth ** level) return min(stake, max_stake) def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]): """ Calcule une régression polynomiale sur les `window` dernières valeurs de la série, puis prédit les `n_future` prochaines valeurs. :param series: Série pandas (ex: dataframe['close']) :param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme :param degree: Degré du polynôme (ex: 2 pour quadratique) :param n_future: Nombre de valeurs futures à prédire :return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures """ if len(series) < window: raise ValueError("La série est trop courte pour la fenêtre spécifiée.") recent_y = series.iloc[-window:].values x = np.arange(window) coeffs = np.polyfit(x, recent_y, degree) poly = np.poly1d(coeffs) x_future = np.arange(window, window + len(steps)) y_future = poly(x_future) # Affichage de la fonction # print("Fonction polynomiale trouvée :") # print(poly) current = series.iloc[-1] count = 0 for future_step in steps: # range(1, n_future + 1) future_x = window - 1 + future_step prediction = poly(future_x) # series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction # ➕ Afficher les prédictions # print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}") if prediction > 0: # current: count += 1 return poly, x_future, y_future, count def should_enter_trade(self, pair: str, last_candle, current_time) -> bool: limit = 3 # if self.pairs[pair]['stop'] and last_candle['max_rsi_12'] <= 60 and last_candle['trend_class'] == -1: # dispo = round(self.wallets.get_available_stake_amount()) # self.pairs[pair]['stop'] = False # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢RESTART", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=0, # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # if not pair.startswith('BTC'): dispo = round(self.wallets.get_available_stake_amount()) # if self.pairs[pair]['stop'] \ # and last_candle[f"{self.indic_1d_p.value}_deriv1"] >= self.indic_deriv1_1d_p_start.value \ # and last_candle[f"{self.indic_1d_p.value}_deriv2"] >= self.indic_deriv2_1d_p_start.value: # self.pairs[pair]['stop'] = False # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢RESTART", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=0, # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # else: # if self.pairs[pair]['stop'] == False \ # and last_candle[f"{self.indic_1d_p.value}_deriv1"] <= self.indic_deriv1_1d_p_stop.value \ # and last_candle[f"{self.indic_1d_p.value}_deriv2"] <= self.indic_deriv2_1d_p_stop.value: # self.pairs[pair]['stop'] = True # # if self.pairs[pair]['current_profit'] > 0: # # self.pairs[pair]['force_sell'] = True # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🔴STOP", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=self.pairs[pair]['current_profit'], # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # return False # if self.pairs[pair]['stop']: # return False return True # Filtrer les paires non-BTC non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')] # Compter les positions actives sur les paires non-BTC max_nb_trades = 0 total_non_btc = 0 max_pair = '' limit_amount = 250 max_amount = 0 for p in non_btc_pairs: max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys']) max_amount = max(max_amount, self.pairs[p]['total_amount']) for p in non_btc_pairs: if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit): # if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount): max_pair = p total_non_btc += self.pairs[p]['count_of_buys'] pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle) if last_candle['mid_smooth_deriv1'] < -0.02: # and last_candle['mid_smooth_deriv2'] > 0): return False self.should_enter_trade_count = 0 # if max_pair != pair and self.pairs[pair]['total_amount'] > 300: # return False if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit): trade = self.pairs[max_pair]['current_trade'] current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) current_time_utc = current_time.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pct_max_max = self.getPctFirstBuy(max_pair, last_candle) # print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}") return max_pair == pair or pct_max < - 0.25 or ( pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30) else: return True def setTrends(self, dataframe: DataFrame): SMOOTH_WIN=10 df = dataframe.copy() # # --- charger les données --- # df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') # --- calcul SMA14 --- # df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14) # --- pente brute --- df['slope'] = df['sma12'].diff() # --- lissage EMA --- df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean() # df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3) # --- normalisation relative --- df['slope_norm'] = 10000 * df['slope_smooth'] / df['close'] # df['slope_norm'].fillna(0, inplace=True) df['slope_norm'] = df['slope_norm'].fillna(0) dataframe['slope_norm'] = df['slope_norm'] def make_model(self, model_type="linear", degree=2, random_state=0): model_type = model_type.lower() if model_type == "linear": return LinearRegression() if model_type == "poly": return make_pipeline(StandardScaler(), PolynomialFeatures(degree=degree, include_bias=False), LinearRegression()) if model_type == "svr": return make_pipeline(StandardScaler(), SVR(kernel="rbf", C=1.0, epsilon=0.1)) if model_type == "rf": return RandomForestRegressor(n_estimators=100, random_state=random_state, n_jobs=1) if model_type == "lgbm": if not _HAS_LGBM: raise RuntimeError("lightgbm n'est pas installé") return LGBMRegressor(n_estimators=100, random_state=random_state) raise ValueError(f"model_type inconnu: {model_type}") def calculateRegressionNew(self, df, indic, lookback=20, future_steps=5, model_type="linear"): df = df.copy() pred_col = f"{indic}_future_pred_cons" df[pred_col] = np.nan X_idx = np.arange(lookback).reshape(-1, 1) values = df[indic].values n = len(values) model = LinearRegression() for i in range(lookback, n - future_steps): window = values[i - lookback:i] # cible = vraie valeur future y_target = values[i + future_steps] if np.isnan(window).any() or np.isnan(y_target): continue # entraînement model.fit(X_idx, window) # prédiction de la valeur future future_x = np.array([[lookback + future_steps - 1]]) pred_future = model.predict(future_x)[0] # la prédiction concerne i + future_steps df.iloc[i + future_steps, df.columns.get_loc(pred_col)] = pred_future return df # ========================================================== # NOUVELLE VERSION : calcule AUSSI les dernières valeurs ! # ========================================================== def calculateRegression( self, df, indic, lookback=30, future_steps=5, model_type="linear", degree=2, weight_mode="exp", weight_strength=2, clip_k=2.0, blend_alpha=0.7, ): values = df[indic].values.astype(float) n = len(values) colname = f"{indic}_future_pred_cons" df[colname] = np.nan # pré-calcul des fenêtres windows = np.lib.stride_tricks.sliding_window_view(values, lookback) # windows[k] = valeurs de [k .. k+lookback-1] # indices valides d’entraînement trainable_end = n - future_steps # créer une fois le modèle model = self.make_model(model_type=model_type, degree=degree) # ================ # BOUCLE TRAINING # ================ for i in range(lookback, trainable_end): window = values[i - lookback:i] if np.isnan(window).any(): continue # delta future réelle y_target = values[i + future_steps] - values[i] # features = positions dans la fenêtre : 0..lookback-1 X_window = np.arange(lookback).reshape(-1, 1) # sample weights if weight_mode == "exp": weights = np.linspace(0.1, 1, lookback) ** weight_strength else: weights = None # entraînement try: model.fit(X_window, window, sample_weight=weights) except Exception: model.fit(X_window, window) # prédiction de la valeur future (position lookback+future_steps-1) y_pred_value = model.predict( np.array([[lookback + future_steps - 1]]) )[0] pred_delta = y_pred_value - values[i] # clipping par volatilité locale local_std = np.std(window) max_change = clip_k * (local_std if local_std > 0 else 1e-9) pred_delta = np.clip(pred_delta, -max_change, max_change) # blend final_pred_value = ( blend_alpha * (values[i] + pred_delta) + (1 - blend_alpha) * values[i] ) df.iloc[i, df.columns.get_loc(colname)] = final_pred_value # ========================================================== # 🔥 CALCUL DES DERNIÈRES VALEURS MANQUANTES 🔥 # ========================================================== # Il reste les indices : [n - future_steps … n - 1] for i in range(trainable_end, n): # fenêtre glissante de fin if i - lookback < 0: continue window = values[i - lookback:i] if np.isnan(window).any(): continue # features X_window = np.arange(lookback).reshape(-1, 1) try: model.fit(X_window, window) except: continue # prédiction d’une continuation locale : future_steps = 1 en fin y_pred_value = model.predict(np.array([[lookback]]))[0] pred_delta = y_pred_value - values[i - 1] final_pred_value = ( blend_alpha * (values[i - 1] + pred_delta) + (1 - blend_alpha) * values[i - 1] ) df.iloc[i, df.columns.get_loc(colname)] = final_pred_value return df def kerasGenerateGraphs(self, dataframe): model = self.model self.kerasGenerateGraphModel(model) self.kerasGenerateGraphPredictions(model, dataframe, self.lookback) self.kerasGenerateGraphPoids(model) def kerasGenerateGraphModel(self, model): plot_model( model, to_file=f"{self.path}/lstm_model.png", show_shapes=True, show_layer_names=True ) def kerasGenerateGraphPredictions(self, model, dataframe, lookback): preds = self.tensorFlowGeneratePredictions(dataframe, lookback, model) # plot plt.figure(figsize=(36, 8)) plt.plot(dataframe[self.indicator_target].values, label=self.indicator_target) plt.plot(preds, label="lstm_pred") plt.legend() plt.savefig(f"{self.path}/lstm_predictions.png") plt.close() def kerasGenerateGraphPoids(self, model): for i, layer in enumerate(model.layers): weights = layer.get_weights() # liste de tableaux numpy # Sauvegarde SAFE : tableau d’objets np.save( f"{self.path}/layer_{i}_weights.npy", np.array(weights, dtype=object) ) # Exemple lecture et heatmap weights_layer0 = np.load( f"{self.path}/layer_{i}_weights.npy", allow_pickle=True ) # Choisir un poids 2D W = None for w in weights_layer0: if isinstance(w, np.ndarray) and w.ndim == 2: W = w break if W is None: print(f"Aucune matrice 2D dans layer {i} (rien à afficher).") return plt.figure(figsize=(8, 6)) sns.heatmap(W, cmap="viridis") plt.title(f"Poids 2D du layer {i}") plt.savefig(f"{self.path}/layer{i}_weights.png") plt.close() # ------------------- # Entraînement # ------------------- def tensorFlowTrain(self, dataframe, future_steps=1, lookback=50, batch_size=32): X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback) # 6) Modèle LSTM self.model = Sequential([ LSTM(64, return_sequences=False, input_shape=(lookback, X_seq.shape[2])), Dense(32, activation="relu"), Dense(1) ]) self.model.compile(loss='mse', optimizer=Adam(learning_rate=1e-4)) self.model.fit(X_seq, y_seq, epochs=self.epochs, batch_size=batch_size, verbose=1) # 7) Sauvegarde self.model.save(f"{self.path}/lstm_model.keras") joblib.dump(self.scaler_X, f"{self.path}/lstm_scaler_X.pkl") joblib.dump(self.scaler_y, f"{self.path}/lstm_scaler_y.pkl") def tensorFlowPrepareDataFrame(self, dataframe, future_steps, lookback): target = self.indicator_target # 1) Détecter NaN / Inf et nettoyer feature_columns = self.model_indicators # [col for col in dataframe.columns if col != target] df = dataframe.copy() df.replace([np.inf, -np.inf], np.nan, inplace=True) df.dropna(subset=feature_columns + [target], inplace=True) # 2) Séparer features et cible X_values = df[feature_columns].values y_values = df[target].values.reshape(-1, 1) # 3) Gestion colonnes constantes (éviter division par zéro) for i in range(X_values.shape[1]): if X_values[:, i].max() == X_values[:, i].min(): X_values[:, i] = 0.0 if y_values.max() == y_values.min(): y_values[:] = 0.0 # 4) Normalisation if self.scaler_X is None: self.scaler_X = MinMaxScaler() X_scaled = self.scaler_X.fit_transform(X_values) if self.y_no_scale: y_scaled = y_values else: if self.scaler_y is None: self.scaler_y = MinMaxScaler() y_scaled = self.scaler_y.fit_transform(y_values) # 5) Création des fenêtres glissantes # X_seq = [] # y_seq = [] # for i in range(len(X_scaled) - lookback - future_steps): # X_seq.append(X_scaled[i:i + lookback]) # y_seq.append(y_scaled[i + lookback + future_steps]) X_seq = [] y_seq = [] max_index = len(X_scaled) - (lookback + future_steps) for i in range(max_index): # fenêtre d'entrée de longueur lookback X_seq.append(X_scaled[i: i + lookback]) # target à +future_steps y_seq.append(y_scaled[i + lookback + future_steps - 1]) X_seq = np.array(X_seq) y_seq = np.array(y_seq) # Vérification finale if np.isnan(X_seq).any() or np.isnan(y_seq).any(): raise ValueError("X_seq ou y_seq contient encore des NaN") if np.isinf(X_seq).any() or np.isinf(y_seq).any(): raise ValueError("X_seq ou y_seq contient encore des Inf") return X_seq, y_seq # ------------------- # Prédiction # ------------------- def tensorFlowPredict(self, dataframe, future_steps=1, lookback=50): feature_columns = self.model_indicators #[col for col in dataframe.columns if col != self.indicator_target] # charger le modèle si pas déjà chargé if self.model is None: self.model = load_model(f"{self.path}/lstm_model.keras", compile=False) self.scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl") self.scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl") X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback) preds = self.tensorFlowGeneratePredictions(dataframe, lookback, self.model) dataframe["lstm_pred"] = preds dataframe["lstm_pred_deriv1"] = dataframe["lstm_pred"].diff() return dataframe def tensorFlowGeneratePredictions(self, dataframe, lookback, model): # features = toutes les colonnes sauf la cible feature_columns = self.model_indicators # [col for col in dataframe.columns if col != self.indicator_target] X_values = dataframe[feature_columns].values # normalisation (avec le scaler utilisé à l'entraînement) X_scaled = self.scaler_X.transform(X_values) # créer les séquences glissantes X_seq = [] for i in range(len(X_scaled) - lookback): X_seq.append(X_scaled[i:i + lookback]) X_seq = np.array(X_seq) # prédictions y_pred_scaled = model.predict(X_seq, verbose=0).flatten() if self.y_no_scale: y_pred = y_pred_scaled else: y_pred = self.scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten() # alignement avec les données preds = [np.nan] * len(dataframe) start = lookback end = start + len(y_pred) # preds[start:end] = y_pred[:end - start] preds[start:start + len(y_pred)] = y_pred # Décaler le dataframe pour ne garder que les lignes avec prédictions y_true = dataframe[self.indicator_target][start:] mae, rmse, mape, hit_ratio = self.reliability_report(y_true, y_pred) # 6) Graphiques # 4) Prédictions avec MC Dropout self.plot_lstm_predictions(dataframe, preds) self.plot_error_histogram(y_true, y_pred) # 7) Rapport texte rapport = self.generate_text_report(mae, rmse, mape, hit_ratio, self.future_steps) print(rapport) return preds def tensorFlowPermutationImportance(self, X, y, metric=mean_absolute_error, n_rounds=3): baseline_pred = self.model.predict(X, verbose=0) baseline_score = metric(y, baseline_pred) importances = [] for col in range(X.shape[1]): scores = [] for _ in range(n_rounds): X_permuted = X.copy() np.random.shuffle(X_permuted[:, col]) pred = self.model.predict(X_permuted, verbose=0) scores.append(metric(y, pred)) importance = np.mean(scores) - baseline_score importances.append(importance) return np.array(importances) def generate_text_report(self, mae, rmse, mape, hit_ratio, n): txt = f""" Fiabilité du modèle à horizon {n} bougies ----------------------------------------- MAE: {mae:.4f} RMSE: {rmse:.4f} MAPE: {mape:.2f} % Hit-ratio (direction): {hit_ratio*100:.2f} % Interprétation : - MAE faible = bonne précision absolue. - MAPE faible = bonne précision relative au prix. - Hit-ratio > 55% = exploitable pour un système de trading directionnel. - 50% ≈ hasard. """ return txt def plot_lstm_predictions(self, dataframe, preds): """ Génère un graphique des prédictions LSTM vs la vraie valeur de l'indicateur. Args: dataframe: pd.DataFrame contenant l'indicateur cible. preds: liste ou np.array des prédictions, alignée sur le dataframe avec des NaN en début à cause du lookback. """ # Convertir preds en np.array preds_array = np.array(preds) # Récupérer la vraie valeur de l'indicateur y_true = dataframe[self.indicator_target].values # Masque pour ne garder que les positions avec prédiction mask_valid = ~np.isnan(preds_array) y_true_valid = y_true[mask_valid] y_pred_valid = preds_array[mask_valid] # Créer le graphique plt.figure(figsize=(45, 5)) plt.plot(y_true_valid, label="Vraie valeur", color="blue") plt.plot(y_pred_valid, label="Prédiction LSTM", color="orange") plt.title(f"Prédictions LSTM vs vrai {self.indicator_target}") plt.xlabel("Index") plt.ylabel(self.indicator_target) plt.legend() plt.grid(True) plt.savefig(f"{self.path}/Prédictions LSTM vs vrai {self.indicator_target}.png") plt.close() def plot_error_histogram(self, y_true, y_pred): errors = y_pred - y_true plt.figure(figsize=(8,5)) plt.hist(errors, bins=30) plt.title("Distribution des erreurs de prédiction") # plt.show() plt.savefig(f"{self.path}/Distribution des erreurs de prédiction.png") plt.close() def reliability_report(self, y_true, y_pred): # moyenne des différences absolues entre les valeurs prédites et les valeurs réelles # | Métrique | Ce qu’elle mesure | Sensibilité | # | --------- | ----------------------- | ---------------------------- | # | MAE | Écart moyen absolu | Moyenne des erreurs | # | RMSE | Écart quadratique moyen | Sensible aux grosses erreurs | # | MAPE | % d’erreur moyenne | Interprétation facile | # | Hit ratio | Direction correcte | Pour trading / signaux | mae = mean_absolute_error(y_true, y_pred) rmse = np.sqrt(mean_squared_error(y_true, y_pred)) mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100 # hit-ratio directionnel real_dir = np.sign(np.diff(y_true)) pred_dir = np.sign(np.diff(y_pred)) hit_ratio = (real_dir == pred_dir).mean() return mae, rmse, mape, hit_ratio """ Mixin utilitaire pour : - charger un modèle Keras (Sequential) - charger scalers (scaler_X, scaler_y) pré-sauvegardés (joblib / numpy) - construire X aligned (lookback) depuis un DataFrame - prédire mean+std via MC Dropout (ou simple predict if no dropout) - retourner prédiction inverse-transformée et score de confiance """ def load_model_and_scalers(self): if self._tf_model is None: self._tf_model = load_model(f"{self.path}/lstm_model.keras", compile=False) self._scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl") self._scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl") def build_X_from_dataframe(self, dataframe, feature_columns): """ Retourne X_seq aligné pour prédiction. dataframe: pandas.DataFrame feature_columns: list de colonnes à utiliser (dans l'ordre) Résultat shape: (n_samples, lookback, n_features) """ values = dataframe[feature_columns].values n = len(values) L = self.lookback if n < L: return np.empty((0, L, len(feature_columns)), dtype=float) X_seq = [] for i in range(n - L + 1): # on veut prédiction pour chaque fenêtre disponible seq = values[i:i+L] X_seq.append(seq) X_seq = np.array(X_seq) return X_seq def mc_dropout_predict(self, model, X, n_samples=40): """ Si le modèle contient du Dropout, on active training=True plusieurs fois Retour: mean (N,1), std (N,1) """ if X.shape[0] == 0: return np.array([]), np.array([]) preds = [] for _ in range(n_samples): p = model(X, training=True).numpy() preds.append(p) preds = np.array(preds) # (n_samples, batch, output) mean = preds.mean(axis=0).flatten() std = preds.std(axis=0).flatten() return mean, std def predict_on_dataframe(self, dataframe, feature_columns): """ Process complet : build X -> scale -> predict -> inverse transform -> align with dataframe Retour: preds_real (len = len(df)) : np.nan pour indices < lookback-1, preds_std_real (same length) : np.nan pour indices < lookback-1 """ self.load_model_and_scalers() X_seq = self.build_X_from_dataframe(dataframe, feature_columns) # shape (N, L, f) if getattr(self, "_scaler_X", None) is not None and X_seq.size: # scaler expects 2D -> reshape then inverse reshape ns, L, f = X_seq.shape X_2d = X_seq.reshape(-1, f) X_2d_scaled = self._scaler_X.transform(X_2d) X_seq_scaled = X_2d_scaled.reshape(ns, L, f) else: X_seq_scaled = X_seq # prediction if self.use_mc_dropout: mean_scaled, std_scaled = self.mc_dropout_predict(self._tf_model, X_seq_scaled, n_samples=self.mc_samples) else: if X_seq_scaled.shape[0] == 0: mean_scaled = np.array([]) std_scaled = np.array([]) else: mean_scaled = self._tf_model.predict(X_seq_scaled, verbose=0).flatten() std_scaled = np.zeros_like(mean_scaled) # inverse transform y if scaler_y exists if getattr(self, "_scaler_y", None) is not None and mean_scaled.size: # scaler expects 2D mean_real = self._scaler_y.inverse_transform(mean_scaled.reshape(-1,1)).flatten() std_real = self._scaler_y.inverse_transform((mean_scaled+std_scaled).reshape(-1,1)).flatten() - mean_real std_real = np.abs(std_real) else: mean_real = mean_scaled std_real = std_scaled # align with dataframe length n_rows = len(dataframe) preds = np.array([np.nan]*n_rows) preds_std = np.array([np.nan]*n_rows) # start = self.lookback - 1 # la première fenêtre correspond à index lookback-1 start = self.lookback - 1 + self.future_steps end = start + len(mean_real) - self.future_steps if len(mean_real) > 0: preds[start:end] = mean_real[:end-start] preds_std[start:end] = std_real[:end-start] # Importance # --- feature importance LSTM --- # On doit découper y_true pour qu'il corresponde 1:1 aux séquences X_seq_scaled # --- feature importance LSTM --- if False and self.dp.runmode.value in ('backtest'): # Y réel (non-scalé) y_all = dataframe[self.indicator_target].values.reshape(-1, 1) # Scaler y if getattr(self, "_scaler_y", None) is not None: y_scaled_all = self.scaler_y.transform(y_all).flatten() else: y_scaled_all = y_all.flatten() # IMPORTANT : même offset que dans build_sequences() offset = self.lookback + self.future_steps y_true = y_scaled_all[offset - 1 - self.future_steps: offset + X_seq_scaled.shape[0]] print(len(X_seq_scaled)) print(len(y_true)) # Vérification if len(y_true) != X_seq_scaled.shape[0]: raise ValueError(f"y_true ({len(y_true)}) != X_seq_scaled ({X_seq_scaled.shape[0]})") feature_importances = self.permutation_importance_lstm(X_seq_scaled, y_true, feature_columns) return preds, preds_std def permutation_importance_lstm(self, X_seq, y_true, feature_names, n_rounds=3): """ X_seq shape : (N, lookback, features) y_true : (N,) """ # baseline baseline_pred = self.model.predict(X_seq, verbose=0).flatten() baseline_score = mean_absolute_error(y_true, baseline_pred) n_features = X_seq.shape[2] importances = [] for f in range(n_features): print(feature_names[f]) scores = [] for _ in range(n_rounds): X_copy = X_seq.copy() # on permute la colonne f dans TOUTES les fenêtres for i in range(X_copy.shape[0]): np.random.shuffle(X_copy[i, :, f]) pred = self.model.predict(X_copy, verbose=0).flatten() scores.append(mean_absolute_error(y_true, pred)) importance = np.mean(scores) - baseline_score print(f"{f} importance indicator {feature_names[f]} = {100 * importance:.5f}%") importances.append(importance) for name, imp in sorted(zip(feature_names, importances), key=lambda x: -x[1]): print(f"{name}: importance = {100 * imp:.5f}%") self.last_feature_importances = importances # Sauvegardes self.save_feature_importance_csv(self.last_feature_importances) self.plot_feature_importances(self.last_feature_importances) print("✔ Feature importance calculée") return dict(zip(feature_names, importances)) def save_feature_importance_csv(self, importances_list): # feature_columns = ['obv_1d', 'min60', ...] longueur = importances_list importances_dict = dict(zip(self.model_indicators, importances_list)) with open(f"{self.path}/feature_importances.csv", "w") as f: f.write("feature,importance\n") for k, v in importances_dict.items(): f.write(f"{k},{v}\n") def plot_feature_importances(self, importances): # Conversion en array importances = np.array(importances) feature_columns = self.model_indicators # Tri décroissant sorted_idx = np.argsort(importances)[::-1] sorted_features = [feature_columns[i] for i in sorted_idx] sorted_importances = importances[sorted_idx] # Plot plt.figure(figsize=(24, 8)) plt.bar(range(len(sorted_features)), sorted_importances) plt.xticks(range(len(sorted_features)), sorted_features, rotation=90) plt.title("Feature Importance (permutation)") plt.tight_layout() plt.savefig(f"{self.path}/Feature Importance.png") plt.close() # ############################################################################################################ # position_sizer_lstm.py # Usage: intégrer les méthodes dans ta classe Strategy (ou comme mixin) """ Mixin pour entraîner / prédire une fraction de position (0..1) avec un LSTM. - lookback : nombre de bougies en entrée - feature_columns : liste des colonnes du dataframe utilisées comme features - model, scalers saved under self.path (ou chemins fournis) """ # CONFIG (à adapter dans ta stratégie) # lookback = 50 # future_steps = 1 # on prédit la prochaine bougie # feature_columns = None # ['open','high','low','close','volume', ...] # model_path = "position_sizer_lstm.keras" # scaler_X_path = "position_sizer_scaler_X.pkl" # scaler_y_path = "position_sizer_scaler_y.pkl" # training params # epochs = 50 # batch_size = 64 # ------------------------ # Data preparation # ------------------------ def _build_sequences_for_position_sizer(self, df): # features (N, f) values = df[self.model_indicators].values.astype(float) # target = realized return after next candle (or profit), here: simple return # you can replace by realized profit if you have it (price change minus fees etc.) prices = df[self.indicator_target].values.astype(float) returns = (np.roll(prices, -self.future_steps) - prices) / prices # next-return returns = returns.reshape(-1, 1) L = self.lookback X_seq = [] y_seq = [] max_i = len(values) - (L + self.future_steps) + 1 if max_i <= 0: return np.empty((0, L, values.shape[1])), np.empty((0, 1)) for i in range(max_i): X_seq.append(values[i : i + L]) # y is the *desired* fraction proxy: we use scaled positive return (could be improved) # Here we use returns[i + L - 1 + future_steps] which is the return after the window y_seq.append(returns[i + L - 1 + self.future_steps - 1]) # equals returns[i+L-1] X_seq = np.array(X_seq) # (ns, L, f) y_seq = np.array(y_seq) # (ns, 1) return X_seq, y_seq # ------------------------ # Scalers save/load # ------------------------ def save_scalers(self, scaler_X, scaler_y, folder=None): import joblib folder = folder or self.path os.makedirs(folder, exist_ok=True) joblib.dump(scaler_X, os.path.join(folder, self.scaler_X_path)) joblib.dump(scaler_y, os.path.join(folder, self.scaler_y_path)) def load_scalers(self, folder=None): import joblib folder = folder or self.path try: self._ps_scaler_X = joblib.load(os.path.join(folder, self.scaler_X_path)) self._ps_scaler_y = joblib.load(os.path.join(folder, self.scaler_y_path)) except Exception: self._ps_scaler_X = None self._ps_scaler_y = None # ------------------------ # Model definition # ------------------------ def build_position_sizer_model(self, n_features): model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(self.lookback, n_features)), tf.keras.layers.LSTM(64, return_sequences=False), tf.keras.layers.Dense(32, activation="relu"), tf.keras.layers.Dense(1, activation="sigmoid") # fraction in [0,1] ]) model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss="mse") return model # ------------------------ # Training # ------------------------ def train_position_sizer(self, dataframe, feature_columns=None, model_path=None, scaler_folder=None, epochs=None, batch_size=None): """ Entrainer le modèle LSTM pour prédire la fraction (0..1). dataframe : pandas DataFrame (doit contenir self.indicator_target) """ feature_columns = feature_columns or self.model_indicators if feature_columns is None: raise ValueError("feature_columns must be provided") X_seq, y_seq = self._build_sequences_for_position_sizer(dataframe) if X_seq.shape[0] == 0: raise ValueError("Pas assez de données pour former des séquences (lookback trop grand).") # scalers scaler_X = MinMaxScaler() ns, L, f = X_seq.shape X_2d = X_seq.reshape(-1, f) X_2d_scaled = scaler_X.fit_transform(X_2d) X_seq_scaled = X_2d_scaled.reshape(ns, L, f) scaler_y = MinMaxScaler(feature_range=(0, 1)) y_scaled = scaler_y.fit_transform(y_seq) # maps returns -> [0,1] (you may want custom transform) # build model model = self.build_position_sizer_model(n_features=f) # callbacks model_path = model_path or os.path.join(self.path, self.model_path) callbacks = [ EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True, verbose=1), ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1), ModelCheckpoint(model_path, save_best_only=True, monitor="val_loss", verbose=0) ] # train epochs = epochs or self.epochs batch_size = batch_size or self.batch_size model.fit(X_seq_scaled, y_scaled, validation_split=0.1, epochs=epochs, batch_size=batch_size, callbacks=callbacks, verbose=2) # save model and scalers os.makedirs(self.path, exist_ok=True) model.save(model_path) self.save_scalers(scaler_X, scaler_y, folder=self.path) # store references self._ps_model = model self._ps_scaler_X = scaler_X self._ps_scaler_y = scaler_y return model # ------------------------ # Load model # ------------------------ def load_position_sizer(self, model_path=None, scaler_folder=None): model_path = model_path or os.path.join(self.path, self.model_path) scaler_folder = scaler_folder or self.path if os.path.exists(model_path): self._ps_model = load_model(model_path, compile=False) else: self._ps_model = None self.load_scalers(scaler_folder) return self._ps_model # ------------------------ # Predict fraction on dataframe # ------------------------ def predict_position_fraction_on_dataframe(self, dataframe, feature_columns=None): """ Retourne un vecteur de fractions (len = len(dataframe)), np.nan pour indices non prédits. """ feature_columns = feature_columns or self.model_indicators if feature_columns is None: raise ValueError("feature_columns must be set") if self._ps_model is None: self.load_position_sizer() # build X sequence (same as training) X_seq, _ = self._build_sequences_for_position_sizer(dataframe) if X_seq.shape[0] == 0: # not enough data return np.full(len(dataframe), np.nan) ns, L, f = X_seq.shape X_2d = X_seq.reshape(-1, f) if self._ps_scaler_X is None: raise ValueError("scaler_X missing (train first or load scalers).") X_2d_scaled = self._ps_scaler_X.transform(X_2d) X_seq_scaled = X_2d_scaled.reshape(ns, L, f) # predict preds = self._ps_model.predict(X_seq_scaled, verbose=0).flatten() # in [0,1] # align with dataframe: first valid prediction corresponds to index = lookback - 1 result = np.full(len(dataframe), np.nan) start = self.lookback - 1 end = start + len(preds) result[start:end] = preds[:end-start] return result # ------------------------ # Adjust trade position (Freqtrade hook) # ------------------------ def position_fraction_to_trade_size(self, fraction, wallet_balance, current_price, min_fraction=0.001, max_fraction=0.5): """ Map fraction [0,1] to a safe fraction of wallet. Apply clamping and minimal size guard. """ if np.isnan(fraction): return min_fraction frac = float(np.clip(fraction, 0.0, 1.0)) # scale to allowed range scaled = min_fraction + frac * (max_fraction - min_fraction) # optionally map to quantity units : quantity = (scaled * wallet_balance) / price return scaled # End of mixin