Files
Freqtrade/Zeus_TensorFlow_1h.py
Jérôme Delacotte c7530208a3 TensorFlow
2025-11-23 22:32:10 +01:00

2660 lines
115 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
import os
import json
import csv
from pandas import DataFrame
from typing import Optional, Union, Tuple
import math
import logging
from pathlib import Path
# --------------------------------
# Add your lib to import here test git
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
from datetime import timezone, timedelta
logger = logging.getLogger(__name__)
# Machine Learning
from sklearn.model_selection import train_test_split
import joblib
import matplotlib.pyplot as plt
from sklearn.metrics import (
classification_report,
confusion_matrix,
accuracy_score,
roc_auc_score,
roc_curve,
precision_score, recall_score, precision_recall_curve,
f1_score, mean_squared_error, r2_score
)
from sklearn.tree import export_text
import inspect
from sklearn.feature_selection import SelectFromModel
from tabulate import tabulate
from sklearn.feature_selection import VarianceThreshold
import seaborn as sns
import lightgbm as lgb
from sklearn.model_selection import cross_val_score
import optuna.visualization as vis
import optuna
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, Ridge, HuberRegressor
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
# Tensorflow
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.models import load_model
from keras.utils import plot_model
from keras.models import Sequential
from keras.layers import LSTM, Dense
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
os.environ["CUDA_VISIBLE_DEVICES"] = "-1" # désactive complètement le GPU
os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_xla_devices=false"
# Couleurs ANSI de base
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
RESET = "\033[0m"
import warnings
warnings.filterwarnings(
"ignore",
message=r".*No further splits with positive gain.*"
)
def pprint_df(dframe):
print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False))
def normalize(df):
df = (df - df.min()) / (df.max() - df.min())
return df
class Zeus_TensorFlow_1h(IStrategy):
startup_candle_count = 60 * 24
# Machine Learning
training_enabled = False
model = None
model_indicators = []
indicator_target = 'sma5'
# Tensorflow
lookback = 72
future_steps = 6
y_no_scale = False
epochs = 200
batch_size = 64
scaler_X = None
scaler_y = None
use_mc_dropout = True
mc_samples = 40
minimal_pct_for_trade = 0.003 # 1.5% seuil (MAPE-based deadzone)
min_hit_ratio = 0.55 # seuil minimal historique pour activer trading
max_uncertainty_pct = 0.7 # si incertitude > 70% de predicted move => skip
base_risk_per_trade = 0.1 # 1% du capital (pour sizing)
_tf_model = None
_scaler_X = None
_scaler_y = None
# internal
_ps_model = None
_ps_scaler_X = None
_ps_scaler_y = None
path = f"user_data/plots/"
model_path = "position_sizer_lstm.keras"
scaler_X_path = "position_sizer_scaler_X.pkl"
scaler_y_path = "position_sizer_scaler_y.pkl"
# ROI table:
minimal_roi = {
"0": 0.564,
"567": 0.273,
"2814": 0.12,
"7675": 0
}
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = False
trailing_stop = False
trailing_stop_positive = 0.15
trailing_stop_positive_offset = 0.20
trailing_only_offset_is_reached = True
# Buy hypers
timeframe = '1m'
timeframe_sup = '5m'
max_open_trades = 5
max_amount = 40
parameters = {}
# DCA config
position_adjustment_enable = True
plot_config = {
"main_plot": {
"sma24": {
"color": "pink"
},
"sma5_1d": {
"color": "blue"
},
# "sma24": {
# "color": "yellow"
# },
"sma60": {
"color": "green"
},
"bb_lowerband": {
"color": "#da59a6"},
"bb_upperband": {
"color": "#da59a6",
},
# "sma12": {
# "color": "blue"
# },
"mid_smooth_3": {
"color": "blue"
}
},
"subplots": {
"Rsi": {
"max_rsi_24": {
"color": "blue"
},
"max_rsi_24": {
"color": "pink"
},
},
"Rsi_deriv1": {
"sma24_deriv1": {
"color": "pink"
},
"sma24_deriv1": {
"color": "yellow"
},
"sma5_deriv1_1d": {
"color": "blue"
},
"sma60_deriv1": {
"color": "green"
}
},
"Rsi_deriv2": {
"sma24_deriv2": {
"color": "pink"
},
"sma24_deriv2": {
"color": "yellow"
},
"sma5_deriv2_1d": {
"color": "blue"
},
"sma60_deriv2": {
"color": "green"
}
},
'Macd': {
"macd_rel_1d": {
"color": "cyan"
},
"macdsignal_rel_1d": {
"color": "pink"
},
"macdhist_rel_1d": {
"color": "yellow"
}
}
}
}
columns_logged = False
pairs = {
pair: {
"first_buy": 0,
"last_buy": 0.0,
"last_min": 999999999999999.5,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
'previous_profit': 0,
"last_candle": {},
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'first_amount': 0,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
'force_buy': False
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
trades = list()
max_profit_pairs = {}
pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=True, load=True)
pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True)
# =========================================================================
should_enter_trade_count = 0
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
if allow_to_buy:
self.trades = list()
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
dispo = round(self.wallets.get_available_stake_amount())
self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.pairs[pair]['total_amount'] = stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0))
profit =trade.calc_profit(rate)
force = self.pairs[pair]['force_sell']
allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss')
if allow_to_sell:
self.trades = list()
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['max_profit'] = 0
self.pairs[pair]['previous_profit'] = 0
self.trades = list()
dispo = round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟥Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(profit, 2)
)
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['has_gain'] = 0
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['total_amount'] = 0
self.pairs[pair]['count_of_buys'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['current_trade'] = None
# else:
# self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked")
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss')
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_1h = dataframe.iloc[-13].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
before_last_candle_2 = dataframe.iloc[-3].squeeze()
before_last_candle_12 = dataframe.iloc[-13].squeeze()
expected_profit = self.expectedProfit(pair, last_candle)
# print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
max_touch_before = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
self.pairs[pair]['current_trade'] = trade
count_of_buys = trade.nr_of_successful_entries
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
max_profit = last_candle['max5'] #self.pairs[pair]['max_profit']
baisse = 0
if profit > 0:
baisse = 1 - (profit / max_profit)
mx = max_profit / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
if minutes % 4 == 0:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
dispo=dispo,
pair=pair,
rate=last_candle['close'],
trade_type='',
profit=round(profit, 2),
buys=count_of_buys,
stake=0
)
# if (last_candle['predicted_pct'] > 0):
# return None
# if (last_candle['sma5'] - before_last_candle['sma5']) / last_candle['sma5'] > 0.0002:
# return None
pair_name = self.getShortName(pair)
if last_candle['max_rsi_24'] > 85 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0.05:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
return str(count_of_buys) + '_' + 'Rsi85_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
if self.pairs[pair]['force_sell']:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
return str(count_of_buys) + '_' + 'Frc_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
if profit > 0 and baisse > 0.30:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
# if max_profit > 0.5 * count_of_buys and baisse > 0.15:
# self.pairs[pair]['force_sell'] = False
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
# return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
factor = 1
if (self.getShortName(pair) == 'BTC'):
factor = 0.5
# if baisse > 2 and baisse > factor * self.pairs[pair]['total_amount'] / 100:
# self.pairs[pair]['force_sell'] = False
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
# return 'Baisse_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain'])
#
# if 1 <= count_of_buys <= 3:
if last_candle['max_rsi_24'] > 75 and profit > expected_profit and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0:
self.pairs[pair]['force_sell'] = False
return str(count_of_buys) + '_' + 'Rsi75_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
def getShortName(self, pair):
return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '')
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, self.timeframe_sup) for pair in pairs]
# informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return
if self.columns_logged % 10 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |"
f"{'rsi':>6}" #|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
)
self.printLineLog()
df = pd.DataFrame.from_dict(self.pairs, orient='index')
colonnes_a_exclure = ['last_candle',
'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
self.printLog(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
rsi = ''
rsi_pct = ''
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = self.getLastLost(last_candle, pair)
if buys is None:
buys = ''
max_touch = ''
pct_max = self.getPctFirstBuy(pair, last_candle)
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
dist_max = ''
color = GREEN if profit > 0 else RED
color_sma24 = GREEN if last_candle['sma24_deriv1'] > 0 else RED
color_sma24_2 = GREEN if last_candle['sma24_deriv2'] > 0 else RED
color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1'] > 0 else RED
color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2'] > 0 else RED
color_sma5 = GREEN if last_candle['sma60_deriv1'] > 0 else RED
color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED
color_smooth = GREEN if last_candle['mid_smooth_deriv1'] > 0 else RED
color_smooth2 = GREEN if last_candle['mid_smooth_deriv2'] > 0 else RED
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
self.pairs[pair]['last_max'], 3)
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
self.pairs[pair]['last_min'], 3)
profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
self.printLog(
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
# f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|"
# f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|"
f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|"
# f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1'], 2):>5}{RESET}"
# f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2'], 2):>5}{RESET}"
# f"|{color_sma5}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}"
# f"|{color_smooth}{round(last_candle['mid_smooth_deriv1'], 2):>5}{RESET}|{color_smooth2}{round(last_candle['mid_smooth_deriv2'], 2):>5}{RESET}"
)
def getLastLost(self, last_candle, pair):
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
return last_lost
def printLineLog(self):
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|"
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
)
def printLog(self, str):
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return;
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
short_pair = self.getShortName(pair)
self.path = f"user_data/plots/{short_pair}/" + ("valide/" if not self.dp.runmode.value in ('backtest') else '')
dataframe = self.populateDataframe(dataframe, timeframe=self.timeframe)
# ################### INFORMATIVE self.timeframe_sup
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe=self.timeframe_sup)
informative = self.populateDataframe(informative, timeframe=self.timeframe_sup)
# informative = self.calculateRegression(informative, 'mid', lookback=15)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, self.timeframe_sup, ffill=True)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
filled_buys = trade.select_filled_orders('buy')
count = 0
amount = 0
for buy in filled_buys:
if count == 0:
dataframe['first_price'] = buy.price
self.pairs[pair]['first_buy'] = buy.price
self.pairs[pair]['first_amount'] = buy.price * buy.filled
# dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
self.pairs[pair]['last_buy'] = buy.price
count = count + 1
amount += buy.price * buy.filled
count_buys = count
self.pairs[pair]['total_amount'] = amount
# dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24'])
self.model_indicators = self.listUsableColumns(dataframe)
# ===============================
# lissage des valeurs horaires
dataframe['mid_smooth'] = dataframe['mid'].rolling(window=6).mean()
dataframe["mid_smooth_deriv1"] = 100 * dataframe["mid_smooth"].diff().rolling(window=6).mean() / \
dataframe['mid_smooth']
dataframe["mid_smooth_deriv2"] = 100 * dataframe["mid_smooth_deriv1"].diff().rolling(window=6).mean()
# dataframe['mid_smooth_5h'] = talib.EMA(dataframe, timeperiod=60) # dataframe['mid'].rolling(window=60).mean()
# dataframe["mid_smooth_5h_deriv1"] = 100 * dataframe["mid_smooth_5h"].diff().rolling(window=60).mean() / \
# dataframe['mid_smooth_5h']
# dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean()
# dataframe['stop_buying_deb'] = ((dataframe['max_rsi_24'] > self.rsi_deb_protect.value)
# & (dataframe['sma24_deriv1'] < self.sma24_deriv1_deb_protect.value)
# )
# dataframe['stop_buying_end'] = ((dataframe['max_rsi_24'] < self.rsi_end_protect.value)
# & (dataframe['sma24_deriv1'] > self.sma24_deriv1_end_protect.value)
# )
#
# latched = np.zeros(len(dataframe), dtype=bool)
#
# for i in range(1, len(dataframe)):
# if dataframe['stop_buying_deb'].iloc[i]:
# latched[i] = True
# elif dataframe['stop_buying_end'].iloc[i]:
# latched[i] = False
# else:
# latched[i] = latched[i - 1]
#
# dataframe['stop_buying'] = latched
dataframe = self.calculateRegression(dataframe, 'mid', lookback=10, future_steps=10, model_type="poly")
dataframe = self.calculateRegression(dataframe, 'sma24', lookback=12, future_steps=12)
dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5)
# TENSOR FLOW
if self.training_enabled and self.dp.runmode.value in ('backtest'):
self.tensorFlowTrain(dataframe, future_steps = self.future_steps)
self.tensorFlowPredict(dataframe)
self.kerasGenerateGraphs(dataframe)
# Lire les colonnes
with open(f"{self.path}/model_metadata.json", "r") as f:
metadata = json.load(f)
self.model_indicators = metadata["feature_columns"]
self.lookback = metadata["lookback"]
self.future_steps = metadata["future_steps"]
# ex: feature_columns correspond aux colonnes utilisées à l'entraînement
# feature_columns = [c for c in dataframe.columns if c not in [self.indicator_target, 'lstm_pred']]
last_n = self.lookback * 2 if not self.dp.runmode.value in ('backtest') else None
# preds, preds_std = self.predict_on_dataframe(dataframe, self.model_indicators, last_n=last_n)
# dataframe["lstm_pred"] = preds
# dataframe["lstm_pred_std"] = preds_std
# # confidence score inversely related to std (optionnel)
# dataframe["pred_confidence"] = 1 / (1 + dataframe["lstm_pred_std"]) # crude; scale to [0..1] if needed
# self.tensorFlowPredict(dataframe)
# # self.kerasGenerateGraphs(dataframe)
#
# dataframe['lstm_pred_smooth'] = dataframe['lstm_pred'].ewm(span=5, adjust=False).mean()
# self.calculeDerivees(dataframe, 'lstm_pred_smooth', timeframe=self.timeframe)
#
# # predicted % change relative to current price
# dataframe["predicted_pct"] = (dataframe["lstm_pred"] - dataframe[self.indicator_target]) / dataframe[
# self.indicator_target]
# # ---- Charger ou prédire ----
# try:
# if self.dp.runmode.value in ('backtest'):
# self.train_position_sizer(dataframe, feature_columns=self.model_indicators)
#
# preds_positions = self.predict_position_fraction_on_dataframe(dataframe, feature_columns=self.model_indicators)
# # ---- Ajouter la colonne des fractions ----
# dataframe["pos_frac"] = preds_positions # valeurs entre 0..1
# # Exemple : valeur correspond à lallocation conseillée du LSTM
#
# except Exception as e:
# self.printLog(f"[LSTM Position] Erreur prediction: {e}")
# dataframe["pos_frac"] = np.full(len(dataframe), np.nan)
return dataframe
def listUsableColumns(self, dataframe):
# Étape 1 : sélectionner numériques
numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns
# Étape 2 : enlever constantes
# usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
# and (c.endswith("_deriv1") or not c.endswith("deriv1"))
# and not c.endswith("_count")
# ]
usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1
and not c.endswith("_state")
# and not c.endswith("_1d")
# and not c.endswith("")
and not c.endswith("_count")
# and not c.startswith("open") and not c.startswith("close")
# and not c.startswith("low") and not c.startswith("high")
# and not c.startswith("haopen") and not c.startswith("haclose")
# and not c.startswith("bb_lower") and not c.startswith("bb_upper")
# and not c.startswith("bb_middle")
and not c.endswith("_class") and not c.endswith("_price")
and not c.startswith('stop_buying')]
# Étape 3 : remplacer inf et NaN par 0
dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
self.printLog("Colonnes utilisables pour le modèle :")
self.printLog(usable_cols)
self.model_indicators = usable_cols
model_metadata = {
"feature_columns": self.model_indicators,
"lookback": self.lookback,
"future_steps": self.future_steps,
}
with open(f"{self.path}/model_metadata.json", "w") as f:
json.dump(model_metadata, f)
# self.model_indicators = self.select_features_from_importance(threshold=0.00015)
return self.model_indicators
def populateDataframe(self, dataframe, timeframe='5m'):
dataframe = dataframe.copy()
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
dataframe["percent"] = dataframe['mid'].pct_change()
dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean()
dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean()
dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean()
# if self.dp.runmode.value in ('backtest'):
# dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close']
dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean()
self.calculeDerivees(dataframe, 'sma5', timeframe=timeframe, ema_period=5)
dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() #dataframe["mid"].rolling(window=12).mean()
self.calculeDerivees(dataframe, 'sma12', timeframe=timeframe, ema_period=12)
dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() #dataframe["mid"].rolling(window=24).mean()
self.calculeDerivees(dataframe, 'sma24', timeframe=timeframe, ema_period=24)
dataframe['sma48'] = dataframe['mid'].ewm(span=48, adjust=False).mean() #dataframe["mid"].rolling(window=48).mean()
self.calculeDerivees(dataframe, 'sma48', timeframe=timeframe, ema_period=48)
dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() #dataframe["mid"].rolling(window=60).mean()
self.calculeDerivees(dataframe, 'sma60', timeframe=timeframe, ema_period=60)
dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", timeframe=timeframe)
# self.printLog(metadata['pair'])
dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
self.calculeDerivees(dataframe, 'rsi', timeframe=timeframe, ema_period=12)
dataframe['max12'] = talib.MAX(dataframe['mid'], timeperiod=12)
dataframe['min12'] = talib.MIN(dataframe['mid'], timeperiod=12)
dataframe['max60'] = talib.MAX(dataframe['mid'], timeperiod=60)
dataframe['min60'] = talib.MIN(dataframe['mid'], timeperiod=60)
dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['mid']) / dataframe['min60'])
# dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36)
# dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36)
# dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36']
# dataframe['maxpct36'] = talib.MAX(dataframe['pct36'], timeperiod=36)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["mid"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma5"]
# dataframe["bb_width"] = (
# (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
# )
# Calcul MACD
macd, macdsignal, macdhist = talib.MACD(
dataframe['mid'],
fastperiod=12,
slowperiod=26,
signalperiod=9
)
# | Nom | Formule / définition | Signification |
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre lécart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal dachat <br> - Croisement du MACD en dessous → signal de vente |
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et laccélération** de la tendance. <br> - Positif et croissant → tendance haussière qui saccélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui saccélère <br> - Négatif mais croissant → ralentissement de la baisse |
# Ajouter dans le dataframe
dataframe['macd'] = macd
dataframe['macdsignal'] = macdsignal
dataframe['macdhist'] = macdhist
# Regarde dans le futur
# # --- Rendre relatif sur chaque série (-1 → 1) ---
# for col in ['macd', 'macdsignal', 'macdhist']:
# series = dataframe[col]
# valid = series[~np.isnan(series)] # ignorer NaN
# min_val = valid.min()
# max_val = valid.max()
# span = max_val - min_val if max_val != min_val else 1
# dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1
#
# dataframe['tdc_macd'] = self.macd_tendance_int(
# dataframe,
# macd_col='macd_rel',
# signal_col='macdsignal_rel',
# hist_col='macdhist_rel'
# )
# --- pente brute ---
dataframe['slope'] = dataframe['sma24'].diff()
# --- lissage EMA ---
dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean()
# --- Volatilité normalisée ---
dataframe['atr'] = ta.volatility.AverageTrueRange(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).average_true_range()
dataframe['atr_norm'] = dataframe['atr'] / dataframe['close']
# --- Force de tendance ---
dataframe['adx'] = ta.trend.ADXIndicator(
high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14
).adx()
# --- Volume directionnel (On Balance Volume) ---
dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator(close=dataframe['mid'], volume=dataframe['volume']).on_balance_volume()
# --- Volatilité récente (écart-type des rendements) ---
dataframe['vol_24'] = dataframe['percent'].rolling(24).std()
# Compter les baisses / hausses consécutives
self.calculateDownAndUp(dataframe, limit=0.0001)
# --- Filtrage des NaN initiaux ---
# dataframe = dataframe.dropna()
dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI
dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance
dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width']
dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3)
dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9)
dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0
###########################
dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean())
self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12)
#############################
# NOUVEAUX
"""
Ajout des indicateurs avancés (fractals, stoch, mfi, entropy, hurst, donchian, keltner, vwap, wick features, etc.).
Ces indicateurs sont concus pour enrichir les entrees du modele TensorFlow.
"""
dataframe = dataframe.copy()
# -----------------------------------------------------------
# 1) Fractals (Bill Williams)
# Fractal haut : point haut local centré (2-3-2)
# Fractal bas : point bas local centré (2-3-2)
# -----------------------------------------------------------
dataframe["fractals_up"] = (
(dataframe["high"].shift(2) < dataframe["high"].shift(1)) &
(dataframe["high"].shift(0) < dataframe["high"].shift(1)) &
(dataframe["high"].shift(3) < dataframe["high"].shift(1)) &
(dataframe["high"].shift(4) < dataframe["high"].shift(1))
).astype(int)
dataframe["fractals_down"] = (
(dataframe["low"].shift(2) > dataframe["low"].shift(1)) &
(dataframe["low"].shift(0) > dataframe["low"].shift(1)) &
(dataframe["low"].shift(3) > dataframe["low"].shift(1)) &
(dataframe["low"].shift(4) > dataframe["low"].shift(1))
).astype(int)
# -----------------------------------------------------------
# 2) Stochastic Oscillator (K, D)
# Capture l'epuisement du mouvement et les extremums de momentum
# -----------------------------------------------------------
stoch_k = talib.STOCH(dataframe["high"], dataframe["low"], dataframe["close"])[0]
stoch_d = talib.STOCH(dataframe["high"], dataframe["low"], dataframe["close"])[1]
dataframe["stoch_k"] = stoch_k
dataframe["stoch_d"] = stoch_d
dataframe["stoch_k_d_diff"] = stoch_k - stoch_d
# -----------------------------------------------------------
# 3) MFI (Money Flow Index)
# Combine prix + volume, excellent pour anticiper les retournements
# -----------------------------------------------------------
dataframe["mfi"] = talib.MFI(
dataframe["high"], dataframe["low"], dataframe["close"], dataframe["volume"], timeperiod=14
)
dataframe["mfi_deriv1"] = dataframe["mfi"].diff()
# -----------------------------------------------------------
# 4) VWAP (Volume-Weighted Average Price)
# Zone d'equilibre du prix ; tres utile pour le sizing et l'analyse structurelle
# -----------------------------------------------------------
typical_price = (dataframe["high"] + dataframe["low"] + dataframe["close"]) / 3
dataframe["vwap"] = (typical_price * dataframe["volume"]).cumsum() / dataframe["volume"].replace(0,
np.nan).cumsum()
dataframe["close_vwap_dist"] = dataframe["close"] / dataframe["vwap"] - 1
# -----------------------------------------------------------
# 5) Donchian Channels
# Basés sur les extremes haut/bas, utiles pour breakout et volatilite
# -----------------------------------------------------------
dataframe["donchian_high"] = dataframe["high"].rolling(24).max()
dataframe["donchian_low"] = dataframe["low"].rolling(24).min()
dataframe["donchian_width"] = (dataframe["donchian_high"] - dataframe["donchian_low"]) / dataframe["close"]
dataframe["donchian_percent"] = (dataframe["close"] - dataframe["donchian_low"]) / (
dataframe["donchian_high"] - dataframe["donchian_low"])
# -----------------------------------------------------------
# 6) Keltner Channels
# Combine volatilite (ATR) + moyenne mobile ; tres stable et utile pour ML
# -----------------------------------------------------------
atr = talib.ATR(dataframe["high"], dataframe["low"], dataframe["close"], timeperiod=20)
ema20 = talib.EMA(dataframe["close"], timeperiod=20)
dataframe["kc_upper"] = ema20 + 2 * atr
dataframe["kc_lower"] = ema20 - 2 * atr
dataframe["kc_width"] = (dataframe["kc_upper"] - dataframe["kc_lower"]) / dataframe["close"]
# -----------------------------------------------------------
# 7) Wick Features
# Encode la forme de la bougie (haut, bas, corps) — tres utile en DL
# -----------------------------------------------------------
dataframe["body"] = abs(dataframe["close"] - dataframe["open"])
dataframe["range"] = dataframe["high"] - dataframe["low"]
dataframe["body_pct"] = dataframe["body"] / dataframe["range"].replace(0, np.nan)
dataframe["upper_wick_pct"] = (dataframe["high"] - dataframe[["close", "open"]].max(axis=1)) / dataframe[
"range"].replace(0, np.nan)
dataframe["lower_wick_pct"] = (dataframe[["close", "open"]].min(axis=1) - dataframe["low"]) / dataframe[
"range"].replace(0, np.nan)
# -----------------------------------------------------------
# 8) Shannon Entropy (sur les variations de prix)
# Mesure le degre d'ordre / chaos ; excellent pour sizing adaptatif
# -----------------------------------------------------------
def rolling_entropy(series, window):
eps = 1e-12
roll = series.rolling(window)
return - (roll.apply(lambda x: np.sum((x / (np.sum(abs(x)) + eps)) *
np.log((x / (np.sum(abs(x)) + eps)) + eps)), raw=False))
dataframe["entropy_24"] = rolling_entropy(dataframe["close"].pct_change(), 24)
# -----------------------------------------------------------
# 9) Hurst Exponent (tendance vs mean reversion)
# Indique si le marche est trending (>0.5) ou mean-reverting (<0.5)
# -----------------------------------------------------------
def hurst_exponent(ts):
if len(ts) < 40:
return np.nan
lags = range(2, 20)
tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags]
poly = np.polyfit(np.log(lags), np.log(tau), 1)
return poly[0] * 2.0
dataframe["hurst_48"] = dataframe["close"].rolling(48).apply(hurst_exponent, raw=False)
# Nettoyage final
dataframe.replace([np.inf, -np.inf], np.nan, inplace=True)
dataframe.fillna(method="ffill", inplace=True)
dataframe.fillna(method="bfill", inplace=True)
# FIN NOUVEAUX
############################
self.setTrends(dataframe)
return dataframe
def feature_auc_scores(self, X, y):
aucs = {}
for col in X.columns:
try:
aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0))
except Exception:
aucs[col] = np.nan
return pd.Series(aucs).sort_values(ascending=False)
def macd_tendance_int(self, dataframe: pd.DataFrame,
macd_col='macd',
signal_col='macdsignal',
hist_col='macdhist',
eps=0.0) -> pd.Series:
"""
Renvoie la tendance MACD sous forme d'entiers.
2 : Haussier
1 : Ralentissement hausse
0 : Neutre
-1 : Ralentissement baisse
-2 : Baissier
"""
# | Nom | Formule / définition | Signification |
# | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
# | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre lécart entre la moyenne courte et la moyenne longue. <br> - Positive → tendance haussière <br> - Négative → tendance baissière |
# | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**. <br> - Croisement du MACD au-dessus → signal dachat <br> - Croisement du MACD en dessous → signal de vente |
# | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et laccélération** de la tendance. <br> - Positif et croissant → tendance haussière qui saccélère <br> - Positif mais décroissant → ralentissement de la hausse <br> - Négatif et décroissant → baisse qui saccélère <br> - Négatif mais croissant → ralentissement de la baisse |
# | Situation | MACD | Signal | Hist | Interprétation |
# | -------------------------- | ---------- | --------- | -------- | ------------------------------------------ |
# | MACD > 0, Hist croissant | au-dessus | croissant | Haussier | Momentum fort → tendance haussière |
# | MACD > 0, Hist décroissant | au-dessus | en baisse | Momentum | La hausse ralentit, prudence |
# | MACD < 0, Hist décroissant | en dessous | en baisse | Baissier | Momentum fort → tendance baissière |
# | MACD < 0, Hist croissant | en dessous | en hausse | Rebond ? | La baisse ralentit → possible retournement |
# Créer une série de 0 par défaut
tendance = pd.Series(0, index=dataframe.index)
# Cas MACD > signal
mask_up = dataframe[macd_col] > dataframe[signal_col] + eps
mask_up_hist_pos = mask_up & (dataframe[hist_col] > 0)
mask_up_hist_neg = mask_up & (dataframe[hist_col] <= 0)
tendance[mask_up_hist_pos] = 2 # Haussier
tendance[mask_up_hist_neg] = 1 # Ralentissement hausse
# Cas MACD < signal
mask_down = dataframe[macd_col] < dataframe[signal_col] - eps
mask_down_hist_neg = mask_down & (dataframe[hist_col] < 0)
mask_down_hist_pos = mask_down & (dataframe[hist_col] >= 0)
tendance[mask_down_hist_neg] = -2 # Baissier
tendance[mask_down_hist_pos] = -1 # Ralentissement baisse
# Les NaN deviennent neutre
tendance[dataframe[[macd_col, signal_col, hist_col]].isna().any(axis=1)] = 0
return tendance
def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['hapercent'] <= limit
dataframe['up'] = dataframe['hapercent'] >= limit
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'):
dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean()
dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", timeframe=timeframe, ema_period=window)
return dataframe
def calculeDerivees(
self,
dataframe: pd.DataFrame,
name: str,
suffixe: str = '',
window: int = 100,
coef: float = 0.15,
ema_period: int = 10,
verbose: bool = True,
timeframe: str = '5m'
) -> pd.DataFrame:
"""
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
avec epsilon adaptatif basé sur rolling percentiles.
"""
d1_col = f"{name}{suffixe}_deriv1"
d2_col = f"{name}{suffixe}_deriv2"
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
# --- Distance à la moyenne mobile ---
dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"]
# dérivée relative simple
dataframe[d1_col] = 1000 * (dataframe[name] - dataframe[name].shift(1)) / dataframe[name].shift(1)
dataframe[d2_col] = dataframe[d1_col] - dataframe[d1_col].shift(1)
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
self.trades = Trade.get_open_trades()
return self.trades
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# qtpylib.crossed_below(dataframe['lstm_pred'], dataframe['sma5'])
# ), ['enter_long', 'enter_tag']] = (1, f"future")
dataframe.loc[
(
(dataframe['sma5_inv'] == 1)
& (
(dataframe['percent3'] <= -0.003)
| (dataframe['percent12'] <= -0.003)
| (dataframe['percent24'] <= -0.003)
)
), ['enter_long', 'enter_tag']] = (1, f"future")
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
#
# if self.dp.runmode.value in ('backtest'):
# dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather")
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# qtpylib.crossed_below(dataframe['lstm_pred'], dataframe['mid'])
# ), ['exit_long', 'exit_tag']] = (1, f"sma60_future")
# dataframe.loc[
# (
# (
# (
# (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1))
# & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons'])
# )
# # | (dataframe['mid_smooth_12_deriv1'] < 0)
# )
# & (dataframe['sma60_future_pred_cons'] < dataframe['sma60_future_pred_cons'].shift(1))
# & (dataframe['hapercent'] < 0)
# ), ['exit_long', 'exit_tag']] = (1, f"sma60_future")
#
# dataframe.loc[
# (
# (
# (dataframe['mid_future_pred_cons'].shift(2) < dataframe['mid_future_pred_cons'].shift(1))
# & (dataframe['mid_future_pred_cons'].shift(1) > dataframe['mid_future_pred_cons'])
#
# )
# # & (dataframe['mid_future_pred_cons'] > dataframe['max12'])
# & (dataframe['hapercent'] < 0)
#
# ), ['exit_long', 'exit_tag']] = (1, f"max12")
return dataframe
# # Position sizing using simplified Kelly-like fraction
# def adjust_trade_positionNew(self, trade: Trade, current_time: datetime,
# current_rate: float, current_profit: float, min_stake: float,
# max_stake: float, **kwargs):
# """
# Return fraction in (0..1] of available position to allocate.
# Uses predicted confidence and historical hit ratio.
# """
# # idx = trade.open_dt_index if hasattr(trade, "open_dt_index") else trade.open_index
# # fallback: use latest row
# dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
# # last_candle = self.dp.get_pair_dataframe(pair).iloc[-1]
# last_candle = dataframe.iloc[-1].squeeze()
# hit = getattr(self, "historical_hit_ratio", 0.6) # you can compute this offline
# pred_conf = last_candle.get("pred_confidence", 0.5)
# predicted_pct = last_candle.get("predicted_pct", 0.0)
#
# # base fraction from hit ratio (simple linear mapping)
# if hit <= 0.5:
# base_frac = 0.01
# else:
# base_frac = min((hit - 0.5) * 2.0, 1.0) # hit 0.6 -> 0.2 ; 0.75 -> 0.5
#
# # scale by confidence and predicted move magnitude
# scale = pred_conf * min(abs(predicted_pct) / max(self.minimal_pct_for_trade, 1e-6), 1.0)
#
# fraction = base_frac * scale
#
# # clamp
# fraction = float(np.clip(fraction, 0.001, 1.0))
# return fraction
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# self.printLog("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pair = trade.pair
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
last_lost = self.getLastLost(last_candle, pair)
pct_first = 0
total_counts = sum(
pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC')
if self.pairs[pair]['first_buy']:
pct_first = self.getPctFirstBuy(pair, last_candle)
pct = self.pct.value
if count_of_buys == 1:
pct_max = current_profit
else:
if self.pairs[trade.pair]['last_buy']:
pct_max = self.getPctLastBuy(pair, last_candle)
else:
pct_max = - pct
# if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2:
# lim = - pct - (count_of_buys * self.pct_inc.value)
# else:
# pct = 0.05
# lim = - pct - (count_of_buys * 0.0025)
lim = 0.3
if (len(dataframe) < 1):
# self.printLog("skip dataframe")
return None
if not self.should_enter_trade(pair, last_candle, current_time):
return None
# Dernier prix d'achat réel (pas le prix moyen)
last_fill_price = self.pairs[trade.pair]['last_buy'] #trade.open_rate # remplacé juste après ↓
# if len(trade.orders) > 0:
# # On cherche le dernier BUY exécuté
# buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"]
# if buy_orders:
# last_fill_price = buy_orders[-1].price
# baisse relative
dca_threshold = 0.0025
decline = (last_fill_price - current_rate) / last_fill_price
# if decline >= self.dca_threshold:
# # Exemple : on achète 50% du montant du dernier trade
# last_amount = buy_orders[-1].amount if buy_orders else 0
# stake_amount = last_amount * current_rate * 0.5
# return stake_amount
# print(f"pctmax={pct_max} lim={lim}")
condition = last_candle['hapercent'] > 0
limit_buy = 40
if decline >= dca_threshold:
try:
if self.pairs[pair]['has_gain'] and profit > 0:
self.pairs[pair]['force_sell'] = True
self.pairs[pair]['previous_profit'] = profit
return None
max_amount = self.config.get('stake_amount') * 2.5
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle))
print(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}")
if stake_amount > 0:
self.pairs[pair]['previous_profit'] = profit
trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟧 Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
# colonnes_a_exclure = ['last_candle', 'stop',
# 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# self.printLog(df_filtered)
return stake_amount
return None
except Exception as exception:
self.printLog(exception)
return None
if (profit > self.pairs[pair]['previous_profit']
and profit - self.pairs[pair]['previous_profit'] > lim #self.pairs[pair]['expected_profit']
# and hours > 6
# and last_candle['sma60_deriv1'] > 0
# and last_candle['max_rsi_12'] < 75
# and last_candle['rsi_1d'] < 58
# and last_candle['stop_buying'] == False
# and last_candle['mid_smooth_5_deriv1_1d'] > 0
and self.wallets.get_available_stake_amount() > 0
):
try:
self.pairs[pair]['previous_profit'] = profit
stake_amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount'])
if stake_amount > 0:
self.pairs[pair]['has_gain'] += 1
trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟡 Gain +",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=str(round(pct_max, 4)),
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
except Exception as exception:
self.printLog(exception)
return None
return None
def getPctFirstBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
def getPctLastBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
# def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# base_stake_amount = self.config.get('stake_amount')
#
# # Récupérer le dataframe de la paire
# try:
# df = self.dp.get_pair_dataframe(pair)
# except Exception:
# return 0.1 # fallback safe size 10%
#
# # Doit exister car rempli dans populate_indicators
# if "pos_frac" not in df.columns:
# return 0.1
#
# # On prend la dernière valeur non-nan
# last = df["pos_frac"].dropna()
# if last.empty:
# return 0.1
#
# raw_frac = float(last.iloc[-1]) # dans [0..1]
#
# # --- Sécurisation ---
# # Clamp dans des limites
# raw_frac = max(0.0, min(raw_frac, 1.0))
#
# # Conversion vers fraction réelle autorisée
# # min=0.1%, max=10% du portefeuille (change si tu veux)
# min_frac = 0.05
# max_frac = 0.25
# final_frac = min_frac + raw_frac * (max_frac - min_frac)
#
# return base_stake_amount * final_frac
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# Calculer le minimum des 14 derniers jours
nb_pairs = len(self.dp.current_whitelist())
base_stake_amount = self.config.get('stake_amount')
if True : #self.pairs[pair]['count_of_buys'] == 0:
factor = 1 #65 / min(65, last_candle['rsi_1d'])
# if last_candle['min_max_60'] > 0.04:
# factor = 2
adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor)
else:
adjusted_stake_amount = self.pairs[pair]['first_amount']
if self.pairs[pair]['count_of_buys'] == 0:
self.pairs[pair]['first_amount'] = adjusted_stake_amount
return adjusted_stake_amount
def expectedProfit(self, pair: str, last_candle: DataFrame):
lim = 0.01
pct = 0.002
if (self.getShortName(pair) == 'BTC'):
lim = 0.005
pct = 0.001
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
self.pairs[pair]['expected_profit'] = expected_profit
return expected_profit
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values
@property
def protections(self):
return [
{
"method": "CooldownPeriod",
"stop_duration_candles": 2
}
# {
# "method": "MaxDrawdown",
# "lookback_period_candles": self.lookback.value,
# "trade_limit": self.trade_limit.value,
# "stop_duration_candles": self.protection_stop.value,
# "max_allowed_drawdown": self.protection_max_allowed_dd.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": self.protection_stoploss_stop.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "only_per_pair": False
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 6,
# "trade_limit": 2,
# "stop_duration_candles": 60,
# "required_profit": 0.02
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "required_profit": 0.01
# }
]
def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15,
max_stake: float = 1000.0) -> float:
"""
Calcule la mise à allouer en fonction du drawdown.
:param pct: Drawdown en pourcentage (ex: -0.12 pour -12%)
:param base_stake: Mise de base (niveau 0)
:param step: Espacement entre paliers (ex: tous les -4%)
:param growth: Facteur de croissance par palier (ex: 1.15 pour +15%)
:param max_stake: Mise maximale à ne pas dépasser
:return: Montant à miser
"""
if pct >= 0:
return base_stake
level = int(abs(pct) / step)
stake = base_stake * (growth ** level)
return min(stake, max_stake)
def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]):
"""
Calcule une régression polynomiale sur les `window` dernières valeurs de la série,
puis prédit les `n_future` prochaines valeurs.
:param series: Série pandas (ex: dataframe['close'])
:param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme
:param degree: Degré du polynôme (ex: 2 pour quadratique)
:param n_future: Nombre de valeurs futures à prédire
:return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures
"""
if len(series) < window:
raise ValueError("La série est trop courte pour la fenêtre spécifiée.")
recent_y = series.iloc[-window:].values
x = np.arange(window)
coeffs = np.polyfit(x, recent_y, degree)
poly = np.poly1d(coeffs)
x_future = np.arange(window, window + len(steps))
y_future = poly(x_future)
# Affichage de la fonction
# self.printLog("Fonction polynomiale trouvée :")
# self.printLog(poly)
current = series.iloc[-1]
count = 0
for future_step in steps: # range(1, n_future + 1)
future_x = window - 1 + future_step
prediction = poly(future_x)
# series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction
# Afficher les prédictions
# self.printLog(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}")
if prediction > 0: # current:
count += 1
return poly, x_future, y_future, count
def should_enter_trade(self, pair: str, last_candle, current_time) -> bool:
limit = 3
# if self.pairs[pair]['stop'] and last_candle['max_rsi_12'] <= 60 and last_candle['trend_class'] == -1:
# dispo = round(self.wallets.get_available_stake_amount())
# self.pairs[pair]['stop'] = False
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟢RESTART",
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=0,
# buys=self.pairs[pair]['count_of_buys'],
# stake=0
# )
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
# if not pair.startswith('BTC'):
dispo = round(self.wallets.get_available_stake_amount())
# if self.pairs[pair]['stop'] \
# and last_candle[f"{self.indic_1d_p.value}_deriv1"] >= self.indic_deriv1_1d_p_start.value \
# and last_candle[f"{self.indic_1d_p.value}_deriv2"] >= self.indic_deriv2_1d_p_start.value:
# self.pairs[pair]['stop'] = False
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟢RESTART",
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=0,
# buys=self.pairs[pair]['count_of_buys'],
# stake=0
# )
# else:
# if self.pairs[pair]['stop'] == False \
# and last_candle[f"{self.indic_1d_p.value}_deriv1"] <= self.indic_deriv1_1d_p_stop.value \
# and last_candle[f"{self.indic_1d_p.value}_deriv2"] <= self.indic_deriv2_1d_p_stop.value:
# self.pairs[pair]['stop'] = True
# # if self.pairs[pair]['current_profit'] > 0:
# # self.pairs[pair]['force_sell'] = True
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🔴STOP",
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=self.pairs[pair]['current_profit'],
# buys=self.pairs[pair]['count_of_buys'],
# stake=0
# )
# return False
# if self.pairs[pair]['stop']:
# return False
return True
# Filtrer les paires non-BTC
non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')]
# Compter les positions actives sur les paires non-BTC
max_nb_trades = 0
total_non_btc = 0
max_pair = ''
limit_amount = 250
max_amount = 0
for p in non_btc_pairs:
max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys'])
max_amount = max(max_amount, self.pairs[p]['total_amount'])
for p in non_btc_pairs:
if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit):
# if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount):
max_pair = p
total_non_btc += self.pairs[p]['count_of_buys']
pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle)
if last_candle['mid_smooth_deriv1'] < -0.02: # and last_candle['mid_smooth_deriv2'] > 0):
return False
self.should_enter_trade_count = 0
# if max_pair != pair and self.pairs[pair]['total_amount'] > 300:
# return False
if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit):
trade = self.pairs[max_pair]['current_trade']
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
current_time_utc = current_time.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pct_max_max = self.getPctFirstBuy(max_pair, last_candle)
# self.printLog(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}")
return max_pair == pair or pct_max < - 0.25 or (
pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30)
else:
return True
def setTrends(self, dataframe: DataFrame):
SMOOTH_WIN=10
df = dataframe.copy()
# # --- charger les données ---
# df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
# --- calcul SMA14 ---
# df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14)
# --- pente brute ---
df['slope'] = df['sma12'].diff()
# --- lissage EMA ---
df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean()
# df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3)
# --- normalisation relative ---
df['slope_norm'] = 10000 * df['slope_smooth'] / df['close']
# df['slope_norm'].fillna(0, inplace=True)
df['slope_norm'] = df['slope_norm'].fillna(0)
dataframe['slope_norm'] = df['slope_norm']
def make_model(self, model_type="linear", degree=2, random_state=0):
model_type = model_type.lower()
if model_type == "linear":
return LinearRegression()
if model_type == "poly":
return make_pipeline(StandardScaler(), PolynomialFeatures(degree=degree, include_bias=False),
LinearRegression())
if model_type == "svr":
return make_pipeline(StandardScaler(), SVR(kernel="rbf", C=1.0, epsilon=0.1))
if model_type == "rf":
return RandomForestRegressor(n_estimators=100, random_state=random_state, n_jobs=1)
if model_type == "lgbm":
if not _HAS_LGBM:
raise RuntimeError("lightgbm n'est pas installé")
return LGBMRegressor(n_estimators=100, random_state=random_state)
raise ValueError(f"model_type inconnu: {model_type}")
def calculateRegressionNew(self, df, indic, lookback=20, future_steps=5, model_type="linear"):
df = df.copy()
pred_col = f"{indic}_future_pred_cons"
df[pred_col] = np.nan
X_idx = np.arange(lookback).reshape(-1, 1)
values = df[indic].values
n = len(values)
model = LinearRegression()
for i in range(lookback, n - future_steps):
window = values[i - lookback:i]
# cible = vraie valeur future
y_target = values[i + future_steps]
if np.isnan(window).any() or np.isnan(y_target):
continue
# entraînement
model.fit(X_idx, window)
# prédiction de la valeur future
future_x = np.array([[lookback + future_steps - 1]])
pred_future = model.predict(future_x)[0]
# la prédiction concerne i + future_steps
df.iloc[i + future_steps, df.columns.get_loc(pred_col)] = pred_future
return df
# ==========================================================
# NOUVELLE VERSION : calcule AUSSI les dernières valeurs !
# ==========================================================
def calculateRegression(
self,
df,
indic,
lookback=30,
future_steps=5,
model_type="linear",
degree=2,
weight_mode="exp",
weight_strength=2,
clip_k=2.0,
blend_alpha=0.7,
):
values = df[indic].values.astype(float)
n = len(values)
colname = f"{indic}_future_pred_cons"
df[colname] = np.nan
# pré-calcul des fenêtres
windows = np.lib.stride_tricks.sliding_window_view(values, lookback)
# windows[k] = valeurs de [k .. k+lookback-1]
# indices valides dentraînement
trainable_end = n - future_steps
# créer une fois le modèle
model = self.make_model(model_type=model_type, degree=degree)
# ================
# BOUCLE TRAINING
# ================
for i in range(lookback, trainable_end):
window = values[i - lookback:i]
if np.isnan(window).any():
continue
# delta future réelle
y_target = values[i + future_steps] - values[i]
# features = positions dans la fenêtre : 0..lookback-1
X_window = np.arange(lookback).reshape(-1, 1)
# sample weights
if weight_mode == "exp":
weights = np.linspace(0.1, 1, lookback) ** weight_strength
else:
weights = None
# entraînement
try:
model.fit(X_window, window, sample_weight=weights)
except Exception:
model.fit(X_window, window)
# prédiction de la valeur future (position lookback+future_steps-1)
y_pred_value = model.predict(
np.array([[lookback + future_steps - 1]])
)[0]
pred_delta = y_pred_value - values[i]
# clipping par volatilité locale
local_std = np.std(window)
max_change = clip_k * (local_std if local_std > 0 else 1e-9)
pred_delta = np.clip(pred_delta, -max_change, max_change)
# blend
final_pred_value = (
blend_alpha * (values[i] + pred_delta)
+ (1 - blend_alpha) * values[i]
)
df.iloc[i, df.columns.get_loc(colname)] = final_pred_value
# ==========================================================
# 🔥 CALCUL DES DERNIÈRES VALEURS MANQUANTES 🔥
# ==========================================================
# Il reste les indices : [n - future_steps … n - 1]
for i in range(trainable_end, n):
# fenêtre glissante de fin
if i - lookback < 0:
continue
window = values[i - lookback:i]
if np.isnan(window).any():
continue
# features
X_window = np.arange(lookback).reshape(-1, 1)
try:
model.fit(X_window, window)
except:
continue
# prédiction dune continuation locale : future_steps = 1 en fin
y_pred_value = model.predict(np.array([[lookback]]))[0]
pred_delta = y_pred_value - values[i - 1]
final_pred_value = (
blend_alpha * (values[i - 1] + pred_delta)
+ (1 - blend_alpha) * values[i - 1]
)
df.iloc[i, df.columns.get_loc(colname)] = final_pred_value
return df
def kerasGenerateGraphs(self, dataframe):
model = self.model
self.kerasGenerateGraphModel(model)
self.kerasGenerateGraphPredictions(model, dataframe, self.lookback)
self.kerasGenerateGraphPoids(model)
def kerasGenerateGraphModel(self, model):
plot_model(
model,
to_file=f"{self.path}/lstm_model.png",
show_shapes=True,
show_layer_names=True
)
def kerasGenerateGraphPredictions(self, model, dataframe, lookback):
preds = self.tensorFlowGeneratePredictions(dataframe, lookback, model)
# plot
plt.figure(figsize=(36, 8))
plt.plot(dataframe[self.indicator_target].values, label=self.indicator_target)
plt.plot(preds, label="lstm_pred")
plt.legend()
plt.savefig(f"{self.path}/lstm_predictions.png")
plt.close()
# plot
if len(dataframe) > 240:
plt.figure(figsize=(16, 8))
plt.plot(dataframe[self.indicator_target][len(dataframe) - 120:].values, label=self.indicator_target)
plt.plot(preds[len(dataframe) - 120:], label="lstm_pred")
plt.legend()
plt.savefig(f"{self.path}/lstm_predictions_last.png")
plt.close()
def kerasGenerateGraphPoids(self, model):
for i, layer in enumerate(model.layers):
weights = layer.get_weights() # liste de tableaux numpy
# Sauvegarde SAFE : tableau dobjets
np.save(
f"{self.path}/layer_{i}_weights.npy",
np.array(weights, dtype=object)
)
# Exemple lecture et heatmap
weights_layer0 = np.load(
f"{self.path}/layer_{i}_weights.npy",
allow_pickle=True
)
# Choisir un poids 2D
W = None
for w in weights_layer0:
if isinstance(w, np.ndarray) and w.ndim == 2:
W = w
break
if W is None:
self.printLog(f"Aucune matrice 2D dans layer {i} (rien à afficher).")
return
plt.figure(figsize=(8, 6))
sns.heatmap(W, cmap="viridis")
plt.title(f"Poids 2D du layer {i}")
plt.savefig(f"{self.path}/layer{i}_weights.png")
plt.close()
# -------------------
# Entraînement
# -------------------
def tensorFlowTrain(self, dataframe, future_steps=1, lookback=50, batch_size=32):
X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback)
# 6) Modèle LSTM
self.model = Sequential([
LSTM(64, return_sequences=False, input_shape=(lookback, X_seq.shape[2])),
Dense(32, activation="relu"),
Dense(1)
])
self.model.compile(loss='mse', optimizer=Adam(learning_rate=1e-4))
self.model.fit(X_seq, y_seq, epochs=self.epochs, batch_size=batch_size, verbose=1)
# 7) Sauvegarde
self.model.save(f"{self.path}/lstm_model.keras")
joblib.dump(self.scaler_X, f"{self.path}/lstm_scaler_X.pkl")
joblib.dump(self.scaler_y, f"{self.path}/lstm_scaler_y.pkl")
def tensorFlowPrepareDataFrame(self, dataframe, future_steps, lookback):
target = self.indicator_target
# 1) Détecter NaN / Inf et nettoyer
feature_columns = self.model_indicators # [col for col in dataframe.columns if col != target]
df = dataframe.copy()
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.dropna(subset=feature_columns + [target], inplace=True)
# 2) Séparer features et cible
X_values = df[feature_columns].values
y_values = df[target].values.reshape(-1, 1)
# 3) Gestion colonnes constantes (éviter division par zéro)
for i in range(X_values.shape[1]):
if X_values[:, i].max() == X_values[:, i].min():
X_values[:, i] = 0.0
if y_values.max() == y_values.min():
y_values[:] = 0.0
# 4) Normalisation
if self.scaler_X is None:
self.scaler_X = MinMaxScaler()
X_scaled = self.scaler_X.fit_transform(X_values)
if self.y_no_scale:
y_scaled = y_values
else:
if self.scaler_y is None:
self.scaler_y = MinMaxScaler()
y_scaled = self.scaler_y.fit_transform(y_values)
# 5) Création des fenêtres glissantes
# X_seq = []
# y_seq = []
# for i in range(len(X_scaled) - lookback - future_steps):
# X_seq.append(X_scaled[i:i + lookback])
# y_seq.append(y_scaled[i + lookback + future_steps])
X_seq = []
y_seq = []
max_index = len(X_scaled) - (lookback + future_steps)
for i in range(max_index):
# fenêtre d'entrée de longueur lookback
X_seq.append(X_scaled[i: i + lookback])
# target à +future_steps
y_seq.append(y_scaled[i + lookback + future_steps - 1])
X_seq = np.array(X_seq)
y_seq = np.array(y_seq)
# Vérification finale
if np.isnan(X_seq).any() or np.isnan(y_seq).any():
raise ValueError("X_seq ou y_seq contient encore des NaN")
if np.isinf(X_seq).any() or np.isinf(y_seq).any():
raise ValueError("X_seq ou y_seq contient encore des Inf")
return X_seq, y_seq
# -------------------
# Prédiction
# -------------------
def tensorFlowPredict(self, dataframe, future_steps=1, lookback=50):
feature_columns = self.model_indicators #[col for col in dataframe.columns if col != self.indicator_target]
# charger le modèle si pas déjà chargé
if self.model is None:
self.model = load_model(f"{self.path}/lstm_model.keras", compile=False)
self.scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl")
self.scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl")
X_seq, y_seq = self.tensorFlowPrepareDataFrame(dataframe, future_steps, lookback)
preds = self.tensorFlowGeneratePredictions(dataframe, lookback, self.model)
dataframe["lstm_pred"] = preds
dataframe["lstm_pred_deriv1"] = dataframe["lstm_pred"].diff()
return dataframe
def tensorFlowGeneratePredictions(self, dataframe, lookback, model):
# features = toutes les colonnes sauf la cible
feature_columns = self.model_indicators # [col for col in dataframe.columns if col != self.indicator_target]
X_values = dataframe[feature_columns].values
# normalisation (avec le scaler utilisé à l'entraînement)
X_scaled = self.scaler_X.transform(X_values)
# créer les séquences glissantes
X_seq = []
for i in range(len(X_scaled) - lookback):
X_seq.append(X_scaled[i:i + lookback])
X_seq = np.array(X_seq)
# prédictions
y_pred_scaled = model.predict(X_seq, verbose=0).flatten()
if self.y_no_scale:
y_pred = y_pred_scaled
else:
y_pred = self.scaler_y.inverse_transform(y_pred_scaled.reshape(-1, 1)).flatten()
# alignement avec les données
preds = [np.nan] * len(dataframe)
start = lookback
end = start + len(y_pred)
# preds[start:end] = y_pred[:end - start]
preds[start:start + len(y_pred)] = y_pred
# Décaler le dataframe pour ne garder que les lignes avec prédictions
y_true = dataframe[self.indicator_target][start:]
mae, rmse, mape, hit_ratio = self.reliability_report(y_true, y_pred)
# 6) Graphiques
# 4) Prédictions avec MC Dropout
self.plot_lstm_predictions(dataframe, preds)
self.plot_error_histogram(y_true, y_pred)
# 7) Rapport texte
rapport = self.generate_text_report(mae, rmse, mape, hit_ratio, self.future_steps)
self.printLog(rapport)
return preds
def tensorFlowPermutationImportance(self, X, y, metric=mean_absolute_error, n_rounds=3):
baseline_pred = self.model.predict(X, verbose=0)
baseline_score = metric(y, baseline_pred)
importances = []
for col in range(X.shape[1]):
scores = []
for _ in range(n_rounds):
X_permuted = X.copy()
np.random.shuffle(X_permuted[:, col])
pred = self.model.predict(X_permuted, verbose=0)
scores.append(metric(y, pred))
importance = np.mean(scores) - baseline_score
importances.append(importance)
return np.array(importances)
def generate_text_report(self, mae, rmse, mape, hit_ratio, n):
txt = f"""
Fiabilité du modèle à horizon {n} bougies
-----------------------------------------
MAE: {mae:.4f}
RMSE: {rmse:.4f}
MAPE: {mape:.2f} %
Hit-ratio (direction): {hit_ratio*100:.2f} %
Interprétation :
- MAE faible = bonne précision absolue.
- MAPE faible = bonne précision relative au prix.
- Hit-ratio > 55% = exploitable pour un système de trading directionnel.
- 50% ≈ hasard.
"""
return txt
def plot_lstm_predictions(self, dataframe, preds):
"""
Génère un graphique des prédictions LSTM vs la vraie valeur de l'indicateur.
Args:
dataframe: pd.DataFrame contenant l'indicateur cible.
preds: liste ou np.array des prédictions, alignée sur le dataframe
avec des NaN en début à cause du lookback.
"""
# Convertir preds en np.array
preds_array = np.array(preds)
# Récupérer la vraie valeur de l'indicateur
y_true = dataframe[self.indicator_target].values
# Masque pour ne garder que les positions avec prédiction
mask_valid = ~np.isnan(preds_array)
y_true_valid = y_true[mask_valid]
y_pred_valid = preds_array[mask_valid]
# Créer le graphique
plt.figure(figsize=(45, 5))
plt.plot(y_true_valid, label="Vraie valeur", color="blue")
plt.plot(y_pred_valid, label="Prédiction LSTM", color="orange")
plt.title(f"Prédictions LSTM vs vrai {self.indicator_target}")
plt.xlabel("Index")
plt.ylabel(self.indicator_target)
plt.legend()
plt.grid(True)
plt.savefig(f"{self.path}/Prédictions LSTM vs vrai {self.indicator_target}.png")
plt.close()
def plot_error_histogram(self, y_true, y_pred):
errors = y_pred - y_true
plt.figure(figsize=(8,5))
plt.hist(errors, bins=30)
plt.title("Distribution des erreurs de prédiction")
# plt.show()
plt.savefig(f"{self.path}/Distribution des erreurs de prédiction.png")
plt.close()
def reliability_report(self, y_true, y_pred):
# moyenne des différences absolues entre les valeurs prédites et les valeurs réelles
# | Métrique | Ce quelle mesure | Sensibilité |
# | --------- | ----------------------- | ---------------------------- |
# | MAE | Écart moyen absolu | Moyenne des erreurs |
# | RMSE | Écart quadratique moyen | Sensible aux grosses erreurs |
# | MAPE | % derreur moyenne | Interprétation facile |
# | Hit ratio | Direction correcte | Pour trading / signaux |
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))
mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
# hit-ratio directionnel
real_dir = np.sign(np.diff(y_true))
pred_dir = np.sign(np.diff(y_pred))
hit_ratio = (real_dir == pred_dir).mean()
return mae, rmse, mape, hit_ratio
"""
Mixin utilitaire pour :
- charger un modèle Keras (Sequential)
- charger scalers (scaler_X, scaler_y) pré-sauvegardés (joblib / numpy)
- construire X aligned (lookback) depuis un DataFrame
- prédire mean+std via MC Dropout (ou simple predict if no dropout)
- retourner prédiction inverse-transformée et score de confiance
"""
use_mc_dropout = True
mc_samples = 40
def load_model_and_scalers(self):
if self._tf_model is None:
self._tf_model = load_model(f"{self.path}/lstm_model.keras", compile=False)
self._scaler_X = joblib.load(f"{self.path}/lstm_scaler_X.pkl")
self._scaler_y = joblib.load(f"{self.path}/lstm_scaler_y.pkl")
def build_X_from_dataframe(self, dataframe, feature_columns):
"""
Retourne X_seq aligné pour prédiction.
dataframe: pandas.DataFrame
feature_columns: list de colonnes à utiliser (dans l'ordre)
Résultat shape: (n_samples, lookback, n_features)
"""
values = dataframe[feature_columns].values
n = len(values)
L = self.lookback
if n < L:
return np.empty((0, L, len(feature_columns)), dtype=float)
X_seq = []
for i in range(n - L + 1): # on veut prédiction pour chaque fenêtre disponible
seq = values[i:i+L]
X_seq.append(seq)
X_seq = np.array(X_seq)
return X_seq
def mc_dropout_predict(self, model, X, n_samples=40):
"""
Si le modèle contient du Dropout, on active training=True plusieurs fois
Retour: mean (N,1), std (N,1)
"""
if X.shape[0] == 0:
return np.array([]), np.array([])
preds = []
for _ in range(n_samples):
p = model(X, training=True).numpy()
preds.append(p)
preds = np.array(preds) # (n_samples, batch, output)
mean = preds.mean(axis=0).flatten()
std = preds.std(axis=0).flatten()
return mean, std
def predict_on_dataframe(self, dataframe, feature_columns, last_n=None):
"""
Prédire seulement sur tout le dataframe (last_n=None)
OU seulement sur les N dernières bougies (last_n = nombre de lignes).
"""
self.load_model_and_scalers()
# --- Sélection des lignes si on veut limiter ---
if last_n is not None:
if last_n < self.lookback:
raise ValueError("last_n doit être >= lookback.")
df_used = dataframe.iloc[-last_n:].copy()
df_offset = len(dataframe) - last_n # position dans le df complet
else:
df_used = dataframe
df_offset = 0
# --- Construction des séquences ---
X_seq = self.build_X_from_dataframe(df_used, feature_columns)
# --- Scaling X ---
if getattr(self, "_scaler_X", None) is not None and X_seq.size:
ns, L, f = X_seq.shape
X_2d = X_seq.reshape(-1, f)
X_2d_scaled = self._scaler_X.transform(X_2d)
X_seq_scaled = X_2d_scaled.reshape(ns, L, f)
else:
X_seq_scaled = X_seq
# --- Prédiction ---
if self.use_mc_dropout:
mean_scaled, std_scaled = self.mc_dropout_predict(
self._tf_model, X_seq_scaled, n_samples=self.mc_samples
)
else:
if X_seq_scaled.shape[0] == 0:
mean_scaled = np.array([])
std_scaled = np.array([])
else:
mean_scaled = self._tf_model.predict(X_seq_scaled, verbose=0).flatten()
std_scaled = np.zeros_like(mean_scaled)
# --- Inverse transform Y ---
if getattr(self, "_scaler_y", None) is not None and mean_scaled.size:
mean_real = self._scaler_y.inverse_transform(mean_scaled.reshape(-1, 1)).flatten()
std_real = (
self._scaler_y.inverse_transform((mean_scaled + std_scaled).reshape(-1, 1)).flatten()
- mean_real
)
std_real = np.abs(std_real)
else:
mean_real = mean_scaled
std_real = std_scaled
# --- Alignement sur le DF complet ---
n_rows = len(dataframe)
preds = np.full(n_rows, np.nan)
preds_std = np.full(n_rows, np.nan)
start = df_offset + (self.lookback - 1 + self.future_steps)
end = start + len(mean_real) - self.future_steps
if len(mean_real) > 0:
preds[start:end] = mean_real[: end - start]
preds_std[start:end] = std_real[: end - start]
# Importance
# --- feature importance LSTM ---
# On doit découper y_true pour qu'il corresponde 1:1 aux séquences X_seq_scaled
# --- feature importance LSTM ---
if self.training_enabled and self.dp.runmode.value in ('backtest'):
# Y réel (non-scalé)
y_all = dataframe[self.indicator_target].values.reshape(-1, 1)
# Scaler y
if getattr(self, "_scaler_y", None) is not None:
y_scaled_all = self.scaler_y.transform(y_all).flatten()
else:
y_scaled_all = y_all.flatten()
# IMPORTANT : même offset que dans build_sequences()
offset = self.lookback + self.future_steps
y_true = y_scaled_all[offset - 1 - self.future_steps: offset + X_seq_scaled.shape[0]]
self.printLog(len(X_seq_scaled))
self.printLog(len(y_true))
# Vérification
if len(y_true) != X_seq_scaled.shape[0]:
raise ValueError(f"y_true ({len(y_true)}) != X_seq_scaled ({X_seq_scaled.shape[0]})")
feature_importances = self.permutation_importance_lstm(X_seq_scaled, y_true, feature_columns)
return preds, preds_std
def permutation_importance_lstm(self, X_seq, y_true, feature_names, n_rounds=3):
"""
X_seq shape : (N, lookback, features)
y_true : (N,)
"""
# baseline
baseline_pred = self.model.predict(X_seq, verbose=0).flatten()
baseline_score = mean_absolute_error(y_true, baseline_pred)
n_features = X_seq.shape[2]
importances = []
for f in range(n_features):
self.printLog(feature_names[f])
scores = []
for _ in range(n_rounds):
X_copy = X_seq.copy()
# on permute la colonne f dans TOUTES les fenêtres
for i in range(X_copy.shape[0]):
np.random.shuffle(X_copy[i, :, f])
pred = self.model.predict(X_copy, verbose=0).flatten()
scores.append(mean_absolute_error(y_true, pred))
importance = np.mean(scores) - baseline_score
self.printLog(f"{f} importance indicator {feature_names[f]} = {100 * importance:.5f}%")
importances.append(importance)
for name, imp in sorted(zip(feature_names, importances), key=lambda x: -x[1]):
self.printLog(f"{name}: importance = {100 * imp:.5f}%")
self.last_feature_importances = importances
# Sauvegardes
self.save_feature_importance_csv(self.last_feature_importances)
self.plot_feature_importances(self.last_feature_importances)
self.printLog("✔ Feature importance calculée")
return dict(zip(feature_names, importances))
def save_feature_importance_csv(self, importances_list):
# feature_columns = ['obv_1d', 'min60', ...] longueur = importances_list
importances_dict = dict(zip(self.model_indicators, importances_list))
with open(f"{self.path}/feature_importances.csv", "w") as f:
f.write("feature,importance\n")
for k, v in importances_dict.items():
f.write(f"{k},{v}\n")
self.select_features_from_importance(threshold=0.00015)
def plot_feature_importances(self, importances):
# Conversion en array
importances = np.array(importances)
feature_columns = self.model_indicators
# Tri décroissant
sorted_idx = np.argsort(importances)[::-1]
sorted_features = [feature_columns[i] for i in sorted_idx]
sorted_importances = importances[sorted_idx]
# Plot
plt.figure(figsize=(24, 8))
plt.bar(range(len(sorted_features)), sorted_importances)
plt.xticks(range(len(sorted_features)), sorted_features, rotation=90)
plt.title("Feature Importance (permutation)")
plt.tight_layout()
plt.savefig(f"{self.path}/Feature Importance.png")
plt.close()
# ############################################################################################################
# position_sizer_lstm.py
# Usage: intégrer les méthodes dans ta classe Strategy (ou comme mixin)
"""
Mixin pour entraîner / prédire une fraction de position (0..1) avec un LSTM.
- lookback : nombre de bougies en entrée
- feature_columns : liste des colonnes du dataframe utilisées comme features
- model, scalers saved under self.path (ou chemins fournis)
"""
# CONFIG (à adapter dans ta stratégie)
# lookback = 50
# future_steps = 1 # on prédit la prochaine bougie
# feature_columns = None # ['open','high','low','close','volume', ...]
# model_path = "position_sizer_lstm.keras"
# scaler_X_path = "position_sizer_scaler_X.pkl"
# scaler_y_path = "position_sizer_scaler_y.pkl"
# training params
# epochs = 50
# batch_size = 64
# ------------------------
# Data preparation
# ------------------------
def _build_sequences_for_position_sizer(self, df):
# features (N, f)
values = df[self.model_indicators].values.astype(float)
# target = realized return after next candle (or profit), here: simple return
# you can replace by realized profit if you have it (price change minus fees etc.)
prices = df[self.indicator_target].values.astype(float)
returns = (np.roll(prices, -self.future_steps) - prices) / prices # next-return
returns = returns.reshape(-1, 1)
L = self.lookback
X_seq = []
y_seq = []
max_i = len(values) - (L + self.future_steps) + 1
if max_i <= 0:
return np.empty((0, L, values.shape[1])), np.empty((0, 1))
for i in range(max_i):
X_seq.append(values[i : i + L])
# y is the *desired* fraction proxy: we use scaled positive return (could be improved)
# Here we use returns[i + L - 1 + future_steps] which is the return after the window
y_seq.append(returns[i + L - 1 + self.future_steps - 1]) # equals returns[i+L-1]
X_seq = np.array(X_seq) # (ns, L, f)
y_seq = np.array(y_seq) # (ns, 1)
return X_seq, y_seq
# ------------------------
# Scalers save/load
# ------------------------
def save_scalers(self, scaler_X, scaler_y, folder=None):
import joblib
folder = folder or self.path
os.makedirs(folder, exist_ok=True)
joblib.dump(scaler_X, os.path.join(folder, self.scaler_X_path))
joblib.dump(scaler_y, os.path.join(folder, self.scaler_y_path))
def load_scalers(self, folder=None):
import joblib
folder = folder or self.path
try:
self._ps_scaler_X = joblib.load(os.path.join(folder, self.scaler_X_path))
self._ps_scaler_y = joblib.load(os.path.join(folder, self.scaler_y_path))
except Exception:
self._ps_scaler_X = None
self._ps_scaler_y = None
# ------------------------
# Model definition
# ------------------------
def build_position_sizer_model(self, n_features):
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(self.lookback, n_features)),
tf.keras.layers.LSTM(64, return_sequences=False),
tf.keras.layers.Dense(32, activation="relu"),
tf.keras.layers.Dense(1, activation="sigmoid") # fraction in [0,1]
])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss="mse")
return model
# ------------------------
# Training
# ------------------------
def train_position_sizer(self, dataframe, feature_columns=None,
model_path=None, scaler_folder=None,
epochs=None, batch_size=None):
"""
Entrainer le modèle LSTM pour prédire la fraction (0..1).
dataframe : pandas DataFrame (doit contenir self.indicator_target)
"""
feature_columns = feature_columns or self.model_indicators
if feature_columns is None:
raise ValueError("feature_columns must be provided")
X_seq, y_seq = self._build_sequences_for_position_sizer(dataframe)
if X_seq.shape[0] == 0:
raise ValueError("Pas assez de données pour former des séquences (lookback trop grand).")
# scalers
scaler_X = MinMaxScaler()
ns, L, f = X_seq.shape
X_2d = X_seq.reshape(-1, f)
X_2d_scaled = scaler_X.fit_transform(X_2d)
X_seq_scaled = X_2d_scaled.reshape(ns, L, f)
scaler_y = MinMaxScaler(feature_range=(0, 1))
y_scaled = scaler_y.fit_transform(y_seq) # maps returns -> [0,1] (you may want custom transform)
# build model
model = self.build_position_sizer_model(n_features=f)
# callbacks
model_path = model_path or os.path.join(self.path, self.model_path)
callbacks = [
EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True, verbose=1),
ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, verbose=1),
ModelCheckpoint(model_path, save_best_only=True, monitor="val_loss", verbose=0)
]
# train
epochs = epochs or self.epochs
batch_size = batch_size or self.batch_size
model.fit(X_seq_scaled, y_scaled, validation_split=0.1,
epochs=epochs, batch_size=batch_size, callbacks=callbacks, verbose=2)
# save model and scalers
os.makedirs(self.path, exist_ok=True)
model.save(model_path)
self.save_scalers(scaler_X, scaler_y, folder=self.path)
# store references
self._ps_model = model
self._ps_scaler_X = scaler_X
self._ps_scaler_y = scaler_y
return model
# ------------------------
# Load model
# ------------------------
def load_position_sizer(self, model_path=None, scaler_folder=None):
model_path = model_path or os.path.join(self.path, self.model_path)
scaler_folder = scaler_folder or self.path
if os.path.exists(model_path):
self._ps_model = load_model(model_path, compile=False)
else:
self._ps_model = None
self.load_scalers(scaler_folder)
return self._ps_model
# ------------------------
# Predict fraction on dataframe
# ------------------------
def predict_position_fraction_on_dataframe(self, dataframe, feature_columns=None):
"""
Retourne un vecteur de fractions (len = len(dataframe)), np.nan pour indices non prédits.
"""
feature_columns = feature_columns or self.model_indicators
if feature_columns is None:
raise ValueError("feature_columns must be set")
if self._ps_model is None:
self.load_position_sizer()
# build X sequence (same as training)
X_seq, _ = self._build_sequences_for_position_sizer(dataframe)
if X_seq.shape[0] == 0:
# not enough data
return np.full(len(dataframe), np.nan)
ns, L, f = X_seq.shape
X_2d = X_seq.reshape(-1, f)
if self._ps_scaler_X is None:
raise ValueError("scaler_X missing (train first or load scalers).")
X_2d_scaled = self._ps_scaler_X.transform(X_2d)
X_seq_scaled = X_2d_scaled.reshape(ns, L, f)
# predict
preds = self._ps_model.predict(X_seq_scaled, verbose=0).flatten() # in [0,1]
# align with dataframe: first valid prediction corresponds to index = lookback - 1
result = np.full(len(dataframe), np.nan)
start = self.lookback - 1
end = start + len(preds)
result[start:end] = preds[:end-start]
return result
# ------------------------
# Adjust trade position (Freqtrade hook)
# ------------------------
def position_fraction_to_trade_size(self, fraction, wallet_balance, current_price,
min_fraction=0.001, max_fraction=0.5):
"""
Map fraction [0,1] to a safe fraction of wallet.
Apply clamping and minimal size guard.
"""
if np.isnan(fraction):
return min_fraction
frac = float(np.clip(fraction, 0.0, 1.0))
# scale to allowed range
scaled = min_fraction + frac * (max_fraction - min_fraction)
# optionally map to quantity units : quantity = (scaled * wallet_balance) / price
return scaled
# End of mixin
# Importances
# 0.015%
def select_features_from_importance(self, threshold=0.00015):
"""
Charge un fichier CSV 'feature, importance',
nettoie les valeurs, applique un seuil et sauvegarde les features sélectionnés.
"""
# Lecture CSV
df = pd.read_csv(f"{self.path}/feature_importances.csv")
# Nettoyage au cas où tu as "0,0123%" au lieu de "0.0123%"
df['importance'] = (
df['importance']
.astype(str)
.str.replace('%', '')
.str.replace(',', '.')
.astype(float) / 100 # ramener en valeur réelle
)
# Tri décroissant
df_sorted = df.sort_values(by="importance", ascending=False)
# Filtrage
df_selected = df_sorted[df_sorted['importance'] >= threshold]
# Extraction liste
features = df_selected['feature'].tolist()
# Sauvegarde
with open(f"{self.path}/selected_features.txt", "w") as f:
for feat in features:
f.write(feat + "\n")
print(f"{len(features)} features conservés (≥ {threshold * 100:.3f}%)")
print(f"✔ Liste enregistrée")
return features