TensorFlow

This commit is contained in:
Jérôme Delacotte
2025-11-23 22:32:10 +01:00
parent dec03afd8b
commit c7530208a3

View File

@@ -117,6 +117,7 @@ class Zeus_TensorFlow_1h(IStrategy):
startup_candle_count = 60 * 24
# Machine Learning
training_enabled = False
model = None
model_indicators = []
indicator_target = 'sma5'
@@ -124,7 +125,7 @@ class Zeus_TensorFlow_1h(IStrategy):
lookback = 72
future_steps = 6
y_no_scale = False
epochs = 40
epochs = 200
batch_size = 64
scaler_X = None
scaler_y = None
@@ -168,8 +169,8 @@ class Zeus_TensorFlow_1h(IStrategy):
trailing_only_offset_is_reached = True
# Buy hypers
timeframe = '1h'
timeframe = '1m'
timeframe_sup = '5m'
max_open_trades = 5
max_amount = 40
@@ -211,12 +212,6 @@ class Zeus_TensorFlow_1h(IStrategy):
"max_rsi_24": {
"color": "pink"
},
# "rsi": {
# "color": "red"
# },
# "rsi_1d": {
# "color": "blue"
# }
},
"Rsi_deriv1": {
"sma24_deriv1": {
@@ -280,6 +275,7 @@ class Zeus_TensorFlow_1h(IStrategy):
'last_date': 0,
'stop': False,
'max_profit': 0,
'first_amount': 0,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
@@ -288,46 +284,12 @@ class Zeus_TensorFlow_1h(IStrategy):
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
# 20 20 40 60 100 160 260 420
# 50 50 100 300 500
# fibo = [1, 1, 2, 3, 5, 8, 13, 21]
# my fibo
# 50 50 50 100 100 150 200 250 350 450 600 1050
fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21]
baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84]
# Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21
# Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050
# Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200
# baisse 1 2 3 5 7 10 14 19 26 35 47 63 84
# factors = [1, 1.1, 1.25, 1.5, 2.0, 3]
# thresholds = [2, 5, 10, 20, 30, 50]
factors = [0.5, 0.75, 1, 1.25, 1.5, 2]
thresholds = [0, 2, 5, 10, 30, 45]
trades = list()
max_profit_pairs = {}
mise_factor_buy = DecimalParameter(0.01, 0.1, default=0.05, decimals=2, space='buy', optimize=True, load=True)
indicators = {'sma5', 'sma12', 'sma24', 'sma60'}
indicators_percent = {'percent', 'percent3', 'percent12', 'percent24', 'percent_1d', 'percent3_1h', 'percent12_1d', 'percent24_1d'}
mises = IntParameter(1, 50, default=5, space='buy', optimize=False, load=True)
ml_prob_buy = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='buy', optimize=True, load=True)
# ml_prob_sell = DecimalParameter(-0.5, 0.5, default=0.0, decimals=2, space='sell', optimize=True, load=True)
pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=True, load=True)
pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=True, load=True)
# rsi_deb_protect = IntParameter(50, 90, default=70, space='protection', optimize=True, load=True)
# rsi_end_protect = IntParameter(20, 60, default=55, space='protection', optimize=True, load=True)
#
# sma24_deriv1_deb_protect = DecimalParameter(-4, 4, default=-2, decimals=1, space='protection', optimize=True, load=True)
# sma24_deriv1_end_protect = DecimalParameter(-4, 4, default=0, decimals=1, space='protection', optimize=True, load=True)
# =========================================================================
should_enter_trade_count = 0
@@ -426,19 +388,6 @@ class Zeus_TensorFlow_1h(IStrategy):
# self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked")
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
current_candle = dataframe.iloc[-1].squeeze()
adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle)
# print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}")
# Use default stake amount.
return adjusted_stake_amount
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
@@ -460,7 +409,7 @@ class Zeus_TensorFlow_1h(IStrategy):
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
max_profit = self.pairs[pair]['max_profit']
max_profit = last_candle['max5'] #self.pairs[pair]['max_profit']
baisse = 0
if profit > 0:
baisse = 1 - (profit / max_profit)
@@ -472,8 +421,9 @@ class Zeus_TensorFlow_1h(IStrategy):
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
if hours % 4 == 0:
if minutes % 4 == 0:
self.log_trade(
last_candle=last_candle,
date=current_time,
@@ -489,6 +439,8 @@ class Zeus_TensorFlow_1h(IStrategy):
# if (last_candle['predicted_pct'] > 0):
# return None
# if (last_candle['sma5'] - before_last_candle['sma5']) / last_candle['sma5'] > 0.0002:
# return None
pair_name = self.getShortName(pair)
if last_candle['max_rsi_24'] > 85 and profit > max(5, expected_profit) and (last_candle['hapercent'] < 0) and last_candle['sma60_deriv1'] < 0.05:
@@ -511,8 +463,7 @@ class Zeus_TensorFlow_1h(IStrategy):
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
# return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
if (last_candle['sma5'] - before_last_candle_12['sma5']) / last_candle['sma5'] > 0.0002:
return None
factor = 1
if (self.getShortName(pair) == 'BTC'):
@@ -536,7 +487,7 @@ class Zeus_TensorFlow_1h(IStrategy):
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
informative_pairs = [(pair, self.timeframe_sup) for pair in pairs]
# informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
@@ -550,7 +501,7 @@ class Zeus_TensorFlow_1h(IStrategy):
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |"
f"{'rsi':>6}|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
f"{'rsi':>6}" #|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
)
self.printLineLog()
df = pd.DataFrame.from_dict(self.pairs, orient='index')
@@ -559,14 +510,11 @@ class Zeus_TensorFlow_1h(IStrategy):
df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
print(df_filtered)
self.printLog(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
sma5_1d = ''
@@ -614,10 +562,10 @@ class Zeus_TensorFlow_1h(IStrategy):
# f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|"
# f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|"
f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|"
f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1'], 2):>5}{RESET}"
f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2'], 2):>5}{RESET}"
f"|{color_sma5}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}"
f"|{color_smooth}{round(last_candle['mid_smooth_deriv1'], 2):>5}{RESET}|{color_smooth2}{round(last_candle['mid_smooth_deriv2'], 2):>5}{RESET}"
# f"{dist_max:>7}|{color_sma24}{round(last_candle['sma24_deriv1'], 2):>5}{RESET}"
# f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2'], 2):>5}{RESET}"
# f"|{color_sma5}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}"
# f"|{color_smooth}{round(last_candle['mid_smooth_deriv1'], 2):>5}{RESET}|{color_smooth2}{round(last_candle['mid_smooth_deriv2'], 2):>5}{RESET}"
)
def getLastLost(self, last_candle, pair):
@@ -646,13 +594,13 @@ class Zeus_TensorFlow_1h(IStrategy):
short_pair = self.getShortName(pair)
self.path = f"user_data/plots/{short_pair}/" + ("valide/" if not self.dp.runmode.value in ('backtest') else '')
dataframe = self.populateDataframe(dataframe, timeframe='1h')
dataframe = self.populateDataframe(dataframe, timeframe=self.timeframe)
# ################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
informative = self.populateDataframe(informative, timeframe='1d')
# ################### INFORMATIVE self.timeframe_sup
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe=self.timeframe_sup)
informative = self.populateDataframe(informative, timeframe=self.timeframe_sup)
# informative = self.calculateRegression(informative, 'mid', lookback=15)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, self.timeframe_sup, ffill=True)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
@@ -679,13 +627,13 @@ class Zeus_TensorFlow_1h(IStrategy):
self.pairs[pair]['last_buy'] = buy.price
count = count + 1
amount += buy.price * buy.filled
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
count_buys = count
# dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100)
self.pairs[pair]['total_amount'] = amount
# dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24'])
self.model_indicators = self.listUsableColumns(dataframe)
# ===============================
# lissage des valeurs horaires
dataframe['mid_smooth'] = dataframe['mid'].rolling(window=6).mean()
@@ -720,9 +668,12 @@ class Zeus_TensorFlow_1h(IStrategy):
dataframe = self.calculateRegression(dataframe, 'mid', lookback=10, future_steps=10, model_type="poly")
dataframe = self.calculateRegression(dataframe, 'sma24', lookback=12, future_steps=12)
dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5)
# TENSOR FLOW
if self.dp.runmode.value in ('backtest'):
self.model_indicators = self.listUsableColumns(dataframe)
if self.training_enabled and self.dp.runmode.value in ('backtest'):
self.tensorFlowTrain(dataframe, future_steps = self.future_steps)
self.tensorFlowPredict(dataframe)
self.kerasGenerateGraphs(dataframe)
@@ -737,16 +688,24 @@ class Zeus_TensorFlow_1h(IStrategy):
# ex: feature_columns correspond aux colonnes utilisées à l'entraînement
# feature_columns = [c for c in dataframe.columns if c not in [self.indicator_target, 'lstm_pred']]
preds, preds_std = self.predict_on_dataframe(dataframe, self.model_indicators)
last_n = self.lookback * 2 if not self.dp.runmode.value in ('backtest') else None
dataframe["lstm_pred"] = preds
dataframe["lstm_pred_std"] = preds_std
# preds, preds_std = self.predict_on_dataframe(dataframe, self.model_indicators, last_n=last_n)
# dataframe["lstm_pred"] = preds
# dataframe["lstm_pred_std"] = preds_std
# # confidence score inversely related to std (optionnel)
# dataframe["pred_confidence"] = 1 / (1 + dataframe["lstm_pred_std"]) # crude; scale to [0..1] if needed
# self.tensorFlowPredict(dataframe)
# # self.kerasGenerateGraphs(dataframe)
#
# dataframe['lstm_pred_smooth'] = dataframe['lstm_pred'].ewm(span=5, adjust=False).mean()
# self.calculeDerivees(dataframe, 'lstm_pred_smooth', timeframe=self.timeframe)
#
# # predicted % change relative to current price
# dataframe["predicted_pct"] = (dataframe["lstm_pred"] - dataframe[self.indicator_target]) / dataframe[
# self.indicator_target]
# predicted % change relative to current price
dataframe["predicted_pct"] = (dataframe["lstm_pred"] - dataframe[self.indicator_target]) / dataframe[
self.indicator_target]
# confidence score inversely related to std (optionnel)
dataframe["pred_confidence"] = 1 / (1 + dataframe["lstm_pred_std"]) # crude; scale to [0..1] if needed
# # ---- Charger ou prédire ----
# try:
@@ -759,7 +718,7 @@ class Zeus_TensorFlow_1h(IStrategy):
# # Exemple : valeur correspond à lallocation conseillée du LSTM
#
# except Exception as e:
# print(f"[LSTM Position] Erreur prediction: {e}")
# self.printLog(f"[LSTM Position] Erreur prediction: {e}")
# dataframe["pos_frac"] = np.full(len(dataframe), np.nan)
return dataframe
@@ -785,19 +744,9 @@ class Zeus_TensorFlow_1h(IStrategy):
and not c.endswith("_class") and not c.endswith("_price")
and not c.startswith('stop_buying')]
# Étape 3 : remplacer inf et NaN par 0
# usable_cols = [
# "obv_1d", "min60", "mid_future_pred_cons", "bb_upperband",
# "bb_lowerband", "open", "max60", "high", "volume_1d",
# "mid_smooth_5_1d", "haclose", "high_1d", "sma24_future_pred_cons",
# "volume_deriv2", "mid_smooth", "volume_deriv2_1d", "bb_middleband",
# "volume_deriv1", "sma60", "volume_dist", "open_1d",
# "haopen", "mid_1d", "min12_1d", "volume_deriv1_1d",
# "max12_1d", "mid_smooth_12", "sma24", "bb_middleband_1d", "sma12_1d",
# ]
dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0)
print("Colonnes utilisables pour le modèle :")
print(usable_cols)
self.printLog("Colonnes utilisables pour le modèle :")
self.printLog(usable_cols)
self.model_indicators = usable_cols
model_metadata = {
"feature_columns": self.model_indicators,
@@ -807,6 +756,8 @@ class Zeus_TensorFlow_1h(IStrategy):
with open(f"{self.path}/model_metadata.json", "w") as f:
json.dump(model_metadata, f)
# self.model_indicators = self.select_features_from_importance(threshold=0.00015)
return self.model_indicators
def populateDataframe(self, dataframe, timeframe='5m'):
@@ -840,7 +791,7 @@ class Zeus_TensorFlow_1h(IStrategy):
dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12",timeframe=timeframe)
dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", timeframe=timeframe)
# print(metadata['pair'])
# self.printLog(metadata['pair'])
dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
@@ -1168,8 +1119,6 @@ class Zeus_TensorFlow_1h(IStrategy):
d1_col = f"{name}{suffixe}_deriv1"
d2_col = f"{name}{suffixe}_deriv2"
factor1 = 100 * (ema_period / 5)
factor2 = 10 * (ema_period / 5)
dataframe[f"{name}{suffixe}_inv"] = (dataframe[f"{name}{suffixe}"].shift(2) >= dataframe[f"{name}{suffixe}"].shift(1)) \
& (dataframe[f"{name}{suffixe}"].shift(1) <= dataframe[f"{name}{suffixe}"])
@@ -1189,35 +1138,24 @@ class Zeus_TensorFlow_1h(IStrategy):
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# qtpylib.crossed_above(dataframe['lstm_pred'], dataframe['mid'])
# qtpylib.crossed_below(dataframe['lstm_pred'], dataframe['sma5'])
# ), ['enter_long', 'enter_tag']] = (1, f"future")
#
# dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
dataframe.loc[
(
(dataframe['sma5_inv'] == 1)
& (
(dataframe['percent3'] <= -0.003)
| (dataframe['percent12'] <= -0.003)
| (dataframe['percent24'] <= -0.003)
)
), ['enter_long', 'enter_tag']] = (1, f"future")
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
#
# if self.dp.runmode.value in ('backtest'):
# dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather")
dataframe.loc[:, "enter_long"] = False
dataframe.loc[:, "enter_short"] = False
# thresholds
pct_thr = self.minimal_pct_for_trade
conf_thr = self.min_hit_ratio # you may want separate param
# simple directional rule with deadzone and uncertainty filter
mask_up = (dataframe["predicted_pct"] > pct_thr) & (dataframe["pred_confidence"] > 0) # further filters below
mask_down = (dataframe["predicted_pct"] < -pct_thr) & (dataframe["pred_confidence"] > 0)
# filter: ensure uncertainty isn't huge relative to predicted move
# if std > |predicted_move| * max_uncertainty_pct => skip
safe_up = mask_up & (dataframe["lstm_pred_std"] <= (
dataframe["predicted_pct"].abs() * dataframe[self.indicator_target] * self.max_uncertainty_pct))
safe_down = mask_down & (dataframe["lstm_pred_std"] <= (
dataframe["predicted_pct"].abs() * dataframe[self.indicator_target] * self.max_uncertainty_pct))
dataframe.loc[safe_up, ['enter_long', 'enter_tag']] = (True, f"future")
# dataframe.loc[safe_down, "enter_short"] = True
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
@@ -1287,12 +1225,12 @@ class Zeus_TensorFlow_1h(IStrategy):
# fraction = float(np.clip(fraction, 0.001, 1.0))
# return fraction
def adjust_trade_positionOld(self, trade: Trade, current_time: datetime,
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# print("skip open orders")
# self.printLog("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
return 0
@@ -1331,40 +1269,57 @@ class Zeus_TensorFlow_1h(IStrategy):
else:
pct_max = - pct
if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2:
lim = - pct - (count_of_buys * self.pct_inc.value)
else:
pct = 0.05
lim = - pct - (count_of_buys * 0.0025)
# if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2:
# lim = - pct - (count_of_buys * self.pct_inc.value)
# else:
# pct = 0.05
# lim = - pct - (count_of_buys * 0.0025)
lim = 0.3
if (len(dataframe) < 1):
# print("skip dataframe")
# self.printLog("skip dataframe")
return None
if not self.should_enter_trade(pair, last_candle, current_time):
return None
condition = (last_candle['enter_long'] and last_candle['hapercent'] > 0) #and last_candle['stop_buying'] == False
# and last_candle['sma60_deriv1'] > 0
# or last_candle['enter_tag'] == 'pct3' \
# or last_candle['enter_tag'] == 'pct3'
# Dernier prix d'achat réel (pas le prix moyen)
last_fill_price = self.pairs[trade.pair]['last_buy'] #trade.open_rate # remplacé juste après ↓
# if (self.getShortName(pair) != 'BTC' and count_of_buys > 3):
# condition = before_last_candle_24['mid_smooth_3'] > before_last_candle_12['mid_smooth_3'] and before_last_candle_12['mid_smooth_3'] < last_candle['mid_smooth_3'] #and last_candle['mid_smooth_3_deriv1'] < -1.5
# if len(trade.orders) > 0:
# # On cherche le dernier BUY exécuté
# buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"]
# if buy_orders:
# last_fill_price = buy_orders[-1].price
# baisse relative
dca_threshold = 0.0025
decline = (last_fill_price - current_rate) / last_fill_price
# if decline >= self.dca_threshold:
# # Exemple : on achète 50% du montant du dernier trade
# last_amount = buy_orders[-1].amount if buy_orders else 0
# stake_amount = last_amount * current_rate * 0.5
# return stake_amount
# print(f"pctmax={pct_max} lim={lim}")
condition = last_candle['hapercent'] > 0
limit_buy = 40
if (count_of_buys < limit_buy) and condition and (pct_max < lim):
if decline >= dca_threshold:
try:
if self.pairs[pair]['has_gain'] and profit > 0:
self.pairs[pair]['force_sell'] = True
self.pairs[pair]['previous_profit'] = profit
return None
max_amount = self.config.get('stake_amount') * 2.5
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle) * abs(last_lost / self.mise_factor_buy.value))
self.adjust_stake_amount(pair, last_candle))
print(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}")
if stake_amount > 0:
self.pairs[pair]['previous_profit'] = profit
trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
@@ -1391,21 +1346,23 @@ class Zeus_TensorFlow_1h(IStrategy):
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# print(df_filtered)
# self.printLog(df_filtered)
return stake_amount
return None
except Exception as exception:
print(exception)
self.printLog(exception)
return None
if (profit > self.pairs[pair]['previous_profit'] and profit > self.pairs[pair]['expected_profit'] and hours > 6
# and last_candle['sma60_deriv1'] > 0
and last_candle['max_rsi_12'] < 75
# and last_candle['rsi_1d'] < 58
# and last_candle['stop_buying'] == False
# and last_candle['mid_smooth_5_deriv1_1d'] > 0
and self.wallets.get_available_stake_amount() > 0
if (profit > self.pairs[pair]['previous_profit']
and profit - self.pairs[pair]['previous_profit'] > lim #self.pairs[pair]['expected_profit']
# and hours > 6
# and last_candle['sma60_deriv1'] > 0
# and last_candle['max_rsi_12'] < 75
# and last_candle['rsi_1d'] < 58
# and last_candle['stop_buying'] == False
# and last_candle['mid_smooth_5_deriv1_1d'] > 0
and self.wallets.get_available_stake_amount() > 0
):
try:
self.pairs[pair]['previous_profit'] = profit
@@ -1434,7 +1391,7 @@ class Zeus_TensorFlow_1h(IStrategy):
return stake_amount
return None
except Exception as exception:
print(exception)
self.printLog(exception)
return None
return None
@@ -1481,15 +1438,14 @@ class Zeus_TensorFlow_1h(IStrategy):
# Calculer le minimum des 14 derniers jours
nb_pairs = len(self.dp.current_whitelist())
base_stake_amount = self.config.get('stake_amount') / (self.mises.value) # * nb_pairs) # Montant de base configuré
base_stake_amount = self.config.get('stake_amount')
# factors = [1, 1.2, 1.3, 1.4]
if self.pairs[pair]['count_of_buys'] == 0:
if True : #self.pairs[pair]['count_of_buys'] == 0:
factor = 1 #65 / min(65, last_candle['rsi_1d'])
# if last_candle['min_max_60'] > 0.04:
# factor = 2
adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor)
adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor)
else:
adjusted_stake_amount = self.pairs[pair]['first_amount']
@@ -1526,7 +1482,7 @@ class Zeus_TensorFlow_1h(IStrategy):
return [
{
"method": "CooldownPeriod",
"stop_duration_candles": 12
"stop_duration_candles": 2
}
# {
# "method": "MaxDrawdown",
@@ -1609,8 +1565,8 @@ class Zeus_TensorFlow_1h(IStrategy):
y_future = poly(x_future)
# Affichage de la fonction
# print("Fonction polynomiale trouvée :")
# print(poly)
# self.printLog("Fonction polynomiale trouvée :")
# self.printLog(poly)
current = series.iloc[-1]
count = 0
@@ -1620,7 +1576,7 @@ class Zeus_TensorFlow_1h(IStrategy):
# series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction
# Afficher les prédictions
# print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}")
# self.printLog(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}")
if prediction > 0: # current:
count += 1
@@ -1730,7 +1686,7 @@ class Zeus_TensorFlow_1h(IStrategy):
current_time_utc = current_time.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pct_max_max = self.getPctFirstBuy(max_pair, last_candle)
# print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}")
# self.printLog(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}")
return max_pair == pair or pct_max < - 0.25 or (
pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30)
else:
@@ -1951,6 +1907,15 @@ class Zeus_TensorFlow_1h(IStrategy):
plt.savefig(f"{self.path}/lstm_predictions.png")
plt.close()
# plot
if len(dataframe) > 240:
plt.figure(figsize=(16, 8))
plt.plot(dataframe[self.indicator_target][len(dataframe) - 120:].values, label=self.indicator_target)
plt.plot(preds[len(dataframe) - 120:], label="lstm_pred")
plt.legend()
plt.savefig(f"{self.path}/lstm_predictions_last.png")
plt.close()
def kerasGenerateGraphPoids(self, model):
for i, layer in enumerate(model.layers):
weights = layer.get_weights() # liste de tableaux numpy
@@ -1975,7 +1940,7 @@ class Zeus_TensorFlow_1h(IStrategy):
break
if W is None:
print(f"Aucune matrice 2D dans layer {i} (rien à afficher).")
self.printLog(f"Aucune matrice 2D dans layer {i} (rien à afficher).")
return
plt.figure(figsize=(8, 6))
@@ -2123,7 +2088,7 @@ class Zeus_TensorFlow_1h(IStrategy):
# 7) Rapport texte
rapport = self.generate_text_report(mae, rmse, mape, hit_ratio, self.future_steps)
print(rapport)
self.printLog(rapport)
return preds
@@ -2233,6 +2198,10 @@ class Zeus_TensorFlow_1h(IStrategy):
- prédire mean+std via MC Dropout (ou simple predict if no dropout)
- retourner prédiction inverse-transformée et score de confiance
"""
use_mc_dropout = True
mc_samples = 40
def load_model_and_scalers(self):
if self._tf_model is None:
self._tf_model = load_model(f"{self.path}/lstm_model.keras", compile=False)
@@ -2274,18 +2243,29 @@ class Zeus_TensorFlow_1h(IStrategy):
std = preds.std(axis=0).flatten()
return mean, std
def predict_on_dataframe(self, dataframe, feature_columns):
def predict_on_dataframe(self, dataframe, feature_columns, last_n=None):
"""
Process complet : build X -> scale -> predict -> inverse transform -> align with dataframe
Retour:
preds_real (len = len(df)) : np.nan pour indices < lookback-1,
preds_std_real (same length) : np.nan pour indices < lookback-1
Prédire seulement sur tout le dataframe (last_n=None)
OU seulement sur les N dernières bougies (last_n = nombre de lignes).
"""
self.load_model_and_scalers()
X_seq = self.build_X_from_dataframe(dataframe, feature_columns) # shape (N, L, f)
# --- Sélection des lignes si on veut limiter ---
if last_n is not None:
if last_n < self.lookback:
raise ValueError("last_n doit être >= lookback.")
df_used = dataframe.iloc[-last_n:].copy()
df_offset = len(dataframe) - last_n # position dans le df complet
else:
df_used = dataframe
df_offset = 0
# --- Construction des séquences ---
X_seq = self.build_X_from_dataframe(df_used, feature_columns)
# --- Scaling X ---
if getattr(self, "_scaler_X", None) is not None and X_seq.size:
# scaler expects 2D -> reshape then inverse reshape
ns, L, f = X_seq.shape
X_2d = X_seq.reshape(-1, f)
X_2d_scaled = self._scaler_X.transform(X_2d)
@@ -2293,9 +2273,11 @@ class Zeus_TensorFlow_1h(IStrategy):
else:
X_seq_scaled = X_seq
# prediction
# --- Prédiction ---
if self.use_mc_dropout:
mean_scaled, std_scaled = self.mc_dropout_predict(self._tf_model, X_seq_scaled, n_samples=self.mc_samples)
mean_scaled, std_scaled = self.mc_dropout_predict(
self._tf_model, X_seq_scaled, n_samples=self.mc_samples
)
else:
if X_seq_scaled.shape[0] == 0:
mean_scaled = np.array([])
@@ -2304,32 +2286,35 @@ class Zeus_TensorFlow_1h(IStrategy):
mean_scaled = self._tf_model.predict(X_seq_scaled, verbose=0).flatten()
std_scaled = np.zeros_like(mean_scaled)
# inverse transform y if scaler_y exists
# --- Inverse transform Y ---
if getattr(self, "_scaler_y", None) is not None and mean_scaled.size:
# scaler expects 2D
mean_real = self._scaler_y.inverse_transform(mean_scaled.reshape(-1,1)).flatten()
std_real = self._scaler_y.inverse_transform((mean_scaled+std_scaled).reshape(-1,1)).flatten() - mean_real
mean_real = self._scaler_y.inverse_transform(mean_scaled.reshape(-1, 1)).flatten()
std_real = (
self._scaler_y.inverse_transform((mean_scaled + std_scaled).reshape(-1, 1)).flatten()
- mean_real
)
std_real = np.abs(std_real)
else:
mean_real = mean_scaled
std_real = std_scaled
# align with dataframe length
# --- Alignement sur le DF complet ---
n_rows = len(dataframe)
preds = np.array([np.nan]*n_rows)
preds_std = np.array([np.nan]*n_rows)
# start = self.lookback - 1 # la première fenêtre correspond à index lookback-1
start = self.lookback - 1 + self.future_steps
preds = np.full(n_rows, np.nan)
preds_std = np.full(n_rows, np.nan)
start = df_offset + (self.lookback - 1 + self.future_steps)
end = start + len(mean_real) - self.future_steps
if len(mean_real) > 0:
preds[start:end] = mean_real[:end-start]
preds_std[start:end] = std_real[:end-start]
preds[start:end] = mean_real[: end - start]
preds_std[start:end] = std_real[: end - start]
# Importance
# --- feature importance LSTM ---
# On doit découper y_true pour qu'il corresponde 1:1 aux séquences X_seq_scaled
# --- feature importance LSTM ---
if False and self.dp.runmode.value in ('backtest'):
if self.training_enabled and self.dp.runmode.value in ('backtest'):
# Y réel (non-scalé)
y_all = dataframe[self.indicator_target].values.reshape(-1, 1)
@@ -2343,8 +2328,8 @@ class Zeus_TensorFlow_1h(IStrategy):
offset = self.lookback + self.future_steps
y_true = y_scaled_all[offset - 1 - self.future_steps: offset + X_seq_scaled.shape[0]]
print(len(X_seq_scaled))
print(len(y_true))
self.printLog(len(X_seq_scaled))
self.printLog(len(y_true))
# Vérification
if len(y_true) != X_seq_scaled.shape[0]:
@@ -2367,7 +2352,7 @@ class Zeus_TensorFlow_1h(IStrategy):
importances = []
for f in range(n_features):
print(feature_names[f])
self.printLog(feature_names[f])
scores = []
for _ in range(n_rounds):
X_copy = X_seq.copy()
@@ -2378,19 +2363,19 @@ class Zeus_TensorFlow_1h(IStrategy):
scores.append(mean_absolute_error(y_true, pred))
importance = np.mean(scores) - baseline_score
print(f"{f} importance indicator {feature_names[f]} = {100 * importance:.5f}%")
self.printLog(f"{f} importance indicator {feature_names[f]} = {100 * importance:.5f}%")
importances.append(importance)
for name, imp in sorted(zip(feature_names, importances), key=lambda x: -x[1]):
print(f"{name}: importance = {100 * imp:.5f}%")
self.printLog(f"{name}: importance = {100 * imp:.5f}%")
self.last_feature_importances = importances
# Sauvegardes
self.save_feature_importance_csv(self.last_feature_importances)
self.plot_feature_importances(self.last_feature_importances)
print("✔ Feature importance calculée")
self.printLog("✔ Feature importance calculée")
return dict(zip(feature_names, importances))
@@ -2402,6 +2387,8 @@ class Zeus_TensorFlow_1h(IStrategy):
for k, v in importances_dict.items():
f.write(f"{k},{v}\n")
self.select_features_from_importance(threshold=0.00015)
def plot_feature_importances(self, importances):
# Conversion en array
importances = np.array(importances)
@@ -2630,3 +2617,43 @@ class Zeus_TensorFlow_1h(IStrategy):
return scaled
# End of mixin
# Importances
# 0.015%
def select_features_from_importance(self, threshold=0.00015):
"""
Charge un fichier CSV 'feature, importance',
nettoie les valeurs, applique un seuil et sauvegarde les features sélectionnés.
"""
# Lecture CSV
df = pd.read_csv(f"{self.path}/feature_importances.csv")
# Nettoyage au cas où tu as "0,0123%" au lieu de "0.0123%"
df['importance'] = (
df['importance']
.astype(str)
.str.replace('%', '')
.str.replace(',', '.')
.astype(float) / 100 # ramener en valeur réelle
)
# Tri décroissant
df_sorted = df.sort_values(by="importance", ascending=False)
# Filtrage
df_selected = df_sorted[df_sorted['importance'] >= threshold]
# Extraction liste
features = df_selected['feature'].tolist()
# Sauvegarde
with open(f"{self.path}/selected_features.txt", "w") as f:
for feat in features:
f.write(feat + "\n")
print(f"{len(features)} features conservés (≥ {threshold * 100:.3f}%)")
print(f"✔ Liste enregistrée")
return features