Calcul 20240101-20250514 1050.151 247.414$ => 4,25 max 11mises

This commit is contained in:
Jérôme Delacotte
2025-05-29 21:36:31 +02:00
parent 525916cc15
commit 83cd52d3cb

View File

@@ -224,7 +224,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# Extraction de la matrice numérique
sma5_deriv1_1h_mid_smooth_3_deriv1__numeric_matrice = sma5_deriv1_1h_mid_smooth_3_deriv1_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values
paliers = {}
# paliers = {}
# =========================================================================
# Parameters hyperopt
@@ -234,7 +234,6 @@ class Zeus_8_3_2_B_4_2(IStrategy):
buy_horizon_predict_1h = IntParameter(1, 6, default=2, space='buy')
# buy_level_predict_1h = IntParameter(2, 5, default=4, space='buy')
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
@@ -411,14 +410,14 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# Juste au cas où (devrait jamais arriver)
return factors[-1]
def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30,
start_factor: float = 1.0, end_factor: float = 2.0) -> float:
if pct <= start_pct:
return start_factor
if pct >= end_pct:
return end_factor
# interpolation linéaire
return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct)
# def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30,
# start_factor: float = 1.0, end_factor: float = 2.0) -> float:
# if pct <= start_pct:
# return start_factor
# if pct >= end_pct:
# return end_factor
# # interpolation linéaire
# return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct)
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
@@ -916,22 +915,50 @@ class Zeus_8_3_2_B_4_2(IStrategy):
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pair = trade.pair
pct_first = 0
if self.pairs[pair]['first_buy']:
pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
pct = 0.012
if count_of_buys == 1:
pct_max = current_profit
else:
if self.pairs[trade.pair]['last_buy']:
pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4)
else:
pct_max = - pct
if pair in ('BTC/USDT', 'BTC/USDC') or count_of_buys <= 2:
lim = - pct - (count_of_buys * 0.001)
# lim = - (0.012 * (1 + round(count_of_buys / 5)) + 0.001 * (count_of_buys - 1))
#lim = - (0.012 + 0.001 * (count_of_buys - 1) + (0.002 * count_of_buys if count_of_buys > 10 else 0.001 * count_of_buys if count_of_buys > 5 else 0))
else:
pct = 0.05
lim = - pct - (count_of_buys * 0.001)
if (len(dataframe) < 1):
print("skip dataframe")
return None
pair = trade.pair
if self.dp.runmode.value in ('dry_run'):
if pair not in ('BTC/USDT', 'BTC/USDC', 'XRP/USDT', 'XRP/USDC', 'ETH/USDT', 'ETH/USDC'):
# print(f"skip pair {pair}")
return None
else:
if pair not in ('BTC/USDT', 'BTC/USDC'):
# print(f"skip pair {pair}")
return None
# else:
# if pair not in ('BTC/USDT', 'BTC/USDC'):
# btc_count = self.pairs['BTC/USDT']['count_of_buys'] + self.pairs['BTC/USDC']['count_of_buys']
# # print(f"skip pair {pair}")
# if (btc_count > 4 or count_of_buys + 1 > btc_count) and pct_max < 0.20:
# return None
# # déclenche un achat si bougie rouge importante
# stake_amount = self.config.get('stake_amount', 100)
# stake_amount = self.config.get('stake_amount')
# stake_amount = min(stake_amount, self.wallets.get_available_stake_amount())
# current_time = current_time.astimezone(timezone.utc)
# seconds_since_filled = (current_time - trade.date_last_filled_utc).total_seconds()
@@ -995,24 +1022,6 @@ class Zeus_8_3_2_B_4_2(IStrategy):
#
# return None
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pct_first = 0
if self.pairs[pair]['first_buy']:
pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
pct = 0.012
if count_of_buys == 1:
pct_max = current_profit
else:
if self.pairs[trade.pair]['last_buy']:
pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4)
else:
pct_max = - pct
lim = - pct - (count_of_buys * 0.001)
# index = self.get_palier_index(pct_first)
# if index is None:
# return None
@@ -1044,7 +1053,8 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# > - 0.03 ==>Avg. stake amount 253.535 USDT │ Total trade volume 145312.936 USDT 284 │ 1.19 │ 1014.898 │ 50.74| 1 day, 17:54:00 │ 283 0 1 99.6 │ 0.684 USDT 0.02% │
# > - 0.015 ==>Avg. stake amount 249.107 USDT │ Total trade volume 138186.861 USDT 275 │ 1.20 │ 901.976 │ 45.1 │ 1 day, 19:17:00 │ 274 0 1 99.6 │ 0.684 USDT 0.02%
condition = True #last_candle['mid_smooth_1h_deriv1'] > - 0.05 #(last_candle['mid_smooth_3_deriv1'] > self.buy_mid_smooth_3_deriv1.value) and (last_candle['mid_smooth_24_deriv1'] > self.buy_mid_smooth_24_deriv1.value)
condition = (last_candle['sma5_deriv1_1h'] > 0 or count_of_buys <= 5)
#last_candle['mid_smooth_1h_deriv1'] > - 0.05 #(last_candle['mid_smooth_3_deriv1'] > self.buy_mid_smooth_3_deriv1.value) and (last_candle['mid_smooth_24_deriv1'] > self.buy_mid_smooth_24_deriv1.value)
# (last_candle['enter_long'] == 1 & (count_of_buys < 3)) \
# or ((before_last_candle['mid_re_smooth_3_deriv1'] <= 0) & (last_candle['mid_re_smooth_3_deriv1'] >= 0) & (3 <= count_of_buys < 6)) \
# or ((before_last_candle['mid_smooth_1h_deriv1'] <= 0) & (last_candle['mid_smooth_1h_deriv1'] >= 0) & (6 <= count_of_buys))
@@ -1064,7 +1074,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
if count < 3:
return None
max_amount = self.config.get('stake_amount', 100) * 2.5
max_amount = self.config.get('stake_amount') * 2.5
# stake_amount = min(stake_amount, self.wallets.get_available_stake_amount())
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / pct) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys])
@@ -1151,7 +1161,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# Calculer le minimum des 14 derniers jours
base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré
base_stake_amount = self.config.get('stake_amount') # Montant de base configuré
first_price = self.pairs[pair]['first_buy']
if (first_price == 0):
@@ -1280,56 +1290,56 @@ class Zeus_8_3_2_B_4_2(IStrategy):
return self.labels[i]
return self.labels[-1] # cas limite pour la borne max
def interpolated_val_from_bins(self, row_pos, col_pos):
"""
Renvoie une approximation interpolée (bilinéaire) d'une valeur dans la matrice
à partir de positions flottantes dans l'index (ligne) et les colonnes.
Parameters:
matrix_df (pd.DataFrame): Matrice des probabilités (index/colonnes = labels).
row_pos (float): Position réelle de la ligne (0 = B5, 10 = H5).
col_pos (float): Position réelle de la colonne (0 = B5, 10 = H5).
Returns:
float: Valeur interpolée, ou NaN si en dehors des bornes.
"""
# Labels ordonnés
n = len(self.labels)
# Vérification des limites
if not (0 <= row_pos <= n - 1) or not (0 <= col_pos <= n - 1):
return np.nan
# Conversion des labels -> matrice
matrix = self.smooth_smadiff_matrice_df.reindex(index=self.labels, columns=self.labels).values
# Coordonnées entières (inférieures)
i = int(np.floor(row_pos))
j = int(np.floor(col_pos))
# Coefficients pour interpolation
dx = row_pos - i
dy = col_pos - j
# Précautions sur les bords
if i >= n - 1: i = n - 2; dx = 1.0
if j >= n - 1: j = n - 2; dy = 1.0
# Récupération des 4 valeurs voisines
v00 = matrix[i][j]
v10 = matrix[i + 1][j]
v01 = matrix[i][j + 1]
v11 = matrix[i + 1][j + 1]
# Interpolation bilinéaire
interpolated = (
(1 - dx) * (1 - dy) * v00 +
dx * (1 - dy) * v10 +
(1 - dx) * dy * v01 +
dx * dy * v11
)
return interpolated
# def interpolated_val_from_bins(self, row_pos, col_pos):
# """
# Renvoie une approximation interpolée (bilinéaire) d'une valeur dans la matrice
# à partir de positions flottantes dans l'index (ligne) et les colonnes.
#
# Parameters:
# matrix_df (pd.DataFrame): Matrice des probabilités (index/colonnes = labels).
# row_pos (float): Position réelle de la ligne (0 = B5, 10 = H5).
# col_pos (float): Position réelle de la colonne (0 = B5, 10 = H5).
#
# Returns:
# float: Valeur interpolée, ou NaN si en dehors des bornes.
# """
#
# # Labels ordonnés
# n = len(self.labels)
#
# # Vérification des limites
# if not (0 <= row_pos <= n - 1) or not (0 <= col_pos <= n - 1):
# return np.nan
#
# # Conversion des labels -> matrice
# matrix = self.smooth_smadiff_matrice_df.reindex(index=self.labels, columns=self.labels).values
#
# # Coordonnées entières (inférieures)
# i = int(np.floor(row_pos))
# j = int(np.floor(col_pos))
#
# # Coefficients pour interpolation
# dx = row_pos - i
# dy = col_pos - j
#
# # Précautions sur les bords
# if i >= n - 1: i = n - 2; dx = 1.0
# if j >= n - 1: j = n - 2; dy = 1.0
#
# # Récupération des 4 valeurs voisines
# v00 = matrix[i][j]
# v10 = matrix[i + 1][j]
# v01 = matrix[i][j + 1]
# v11 = matrix[i + 1][j + 1]
#
# # Interpolation bilinéaire
# interpolated = (
# (1 - dx) * (1 - dy) * v00 +
# dx * (1 - dy) * v10 +
# (1 - dx) * dy * v01 +
# dx * dy * v11
# )
# return interpolated
def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label):
"""
@@ -1415,7 +1425,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
def smooth_and_derivatives(self, series, window=25, polyorder=3):
series = series.copy()
if series.isna().sum() > 0:
series = series.fillna(method='ffill').fillna(method='bfill') # Si tu veux éviter toute NaN
series = series.ffill().bfill() # Si tu veux éviter toute NaN
smooth = self.causal_savgol(series, window=window, polyorder=polyorder)
deriv1 = np.diff(smooth, prepend=smooth[0])
@@ -1487,85 +1497,85 @@ class Zeus_8_3_2_B_4_2(IStrategy):
return paliers
def get_dca_stakes(self,
max_drawdown: float = 0.65,
base_stake: float = 100.0,
first_steps: list[float] = [0.01, 0.01, 0.015, 0.015],
growth: float = 1.2,
stake_growth: float = 1.15
) -> list[tuple[float, float]]:
"""
Génère les paliers de drawdown et leurs stakes associés.
:param max_drawdown: Maximum drawdown (ex: 0.65 pour -65%)
:param base_stake: Mise initiale
:param first_steps: Paliers de départ (plus resserrés)
:param growth: Multiplicateur d'espacement des paliers
:param stake_growth: Croissance multiplicative des mises
:return: Liste de tuples (palier_pct, stake)
[(-0.01, 100.0), (-0.02, 115.0), (-0.035, 132.25), (-0.05, 152.09), (-0.068, 174.9),
(-0.0896, 201.14), (-0.1155, 231.31), (-0.1466, 266.0), (-0.1839, 305.9), (-0.2287, 351.79),
(-0.2825, 404.56), (-0.347, 465.24), (-0.4244, 535.03), (-0.5173, 615.28), (-0.6287, 707.57)]
"""
paliers = [
(-0.01, 100.0), (-0.02, 115.0), (-0.035, 130), (-0.05, 150), (-0.07, 150),
(-0.10, 150), (-0.15, 150), (-0.20, 150), (-0.25, 150),
(-0.30, 200), (-0.40, 200),
(-0.50, 300), (-0.60, 400), (-0.70, 500), (-0.80, 1000)
]
# cumulated = 0.0
# stake = base_stake
# def get_dca_stakes(self,
# max_drawdown: float = 0.65,
# base_stake: float = 100.0,
# first_steps: list[float] = [0.01, 0.01, 0.015, 0.015],
# growth: float = 1.2,
# stake_growth: float = 1.15
# ) -> list[tuple[float, float]]:
# """
# Génère les paliers de drawdown et leurs stakes associés.
#
# # Étapes initiales
# for step in first_steps:
# cumulated += step
# paliers.append((round(-cumulated, 4), round(stake, 2)))
# stake *= stake_growth
# :param max_drawdown: Maximum drawdown (ex: 0.65 pour -65%)
# :param base_stake: Mise initiale
# :param first_steps: Paliers de départ (plus resserrés)
# :param growth: Multiplicateur d'espacement des paliers
# :param stake_growth: Croissance multiplicative des mises
# :return: Liste de tuples (palier_pct, stake)
# [(-0.01, 100.0), (-0.02, 115.0), (-0.035, 132.25), (-0.05, 152.09), (-0.068, 174.9),
# (-0.0896, 201.14), (-0.1155, 231.31), (-0.1466, 266.0), (-0.1839, 305.9), (-0.2287, 351.79),
# (-0.2825, 404.56), (-0.347, 465.24), (-0.4244, 535.03), (-0.5173, 615.28), (-0.6287, 707.57)]
# """
# paliers = [
# (-0.01, 100.0), (-0.02, 115.0), (-0.035, 130), (-0.05, 150), (-0.07, 150),
# (-0.10, 150), (-0.15, 150), (-0.20, 150), (-0.25, 150),
# (-0.30, 200), (-0.40, 200),
# (-0.50, 300), (-0.60, 400), (-0.70, 500), (-0.80, 1000)
# ]
#
# # Étapes suivantes
# step = first_steps[-1]
# while cumulated < max_drawdown:
# step *= growth
# cumulated += step
# if cumulated >= max_drawdown:
# # cumulated = 0.0
# # stake = base_stake
# #
# # # Étapes initiales
# # for step in first_steps:
# # cumulated += step
# # paliers.append((round(-cumulated, 4), round(stake, 2)))
# # stake *= stake_growth
# #
# # # Étapes suivantes
# # step = first_steps[-1]
# # while cumulated < max_drawdown:
# # step *= growth
# # cumulated += step
# # if cumulated >= max_drawdown:
# # break
# # paliers.append((round(-cumulated, 4), round(stake, 2)))
# # stake *= stake_growth
#
# return paliers
# def get_active_stake(self, pct: float) -> float:
# """
# Renvoie la mise correspondant au drawdown `pct`.
#
# :param pct: drawdown courant (négatif, ex: -0.043)
# :param paliers: liste de tuples (drawdown, stake)
# :return: stake correspondant
# """
# abs_pct = abs(pct)
# stake = self.paliers[0][1] # stake par défaut
#
# for palier, s in self.paliers:
# if abs_pct >= abs(palier):
# stake = s
# else:
# break
# paliers.append((round(-cumulated, 4), round(stake, 2)))
# stake *= stake_growth
#
# return stake
return paliers
def get_active_stake(self, pct: float) -> float:
"""
Renvoie la mise correspondant au drawdown `pct`.
:param pct: drawdown courant (négatif, ex: -0.043)
:param paliers: liste de tuples (drawdown, stake)
:return: stake correspondant
"""
abs_pct = abs(pct)
stake = self.paliers[0][1] # stake par défaut
for palier, s in self.paliers:
if abs_pct >= abs(palier):
stake = s
else:
break
return stake
def get_palier_index(self, pct):
"""
Retourne l'index du palier franchi pour un pourcentage de baisse donné (pct).
On cherche le palier le plus profond atteint (dernier franchi).
"""
for i in reversed(range(len(self.paliers))):
seuil, _ = self.paliers[i]
#print(f"pct={pct} seuil={seuil}")
if pct <= seuil:
# print(pct)
return i
return None # Aucun palier atteint
# def get_palier_index(self, pct):
# """
# Retourne l'index du palier franchi pour un pourcentage de baisse donné (pct).
# On cherche le palier le plus profond atteint (dernier franchi).
# """
# for i in reversed(range(len(self.paliers))):
# seuil, _ = self.paliers[i]
# #print(f"pct={pct} seuil={seuil}")
# if pct <= seuil:
# # print(pct)
# return i
# return None # Aucun palier atteint
# def poly_regression_predictions(self, series: pd.Series, window: int = 20, degree: int = 2, n_future: int = 3) -> pd.DataFrame:
# """
@@ -1634,34 +1644,34 @@ class Zeus_8_3_2_B_4_2(IStrategy):
return poly, x_future, y_future, count
def calculateStats2(self, df, index, target):
# Nombre de tranches (modifiable)
n_bins_indice = 11
n_bins_valeur = 11
# Tranches dynamiques
# df['indice_tranche'] = pd.qcut(df[f"{index}"], q=n_bins_indice, duplicates='drop')
# df['valeur_tranche'] = pd.qcut(df[f"{target}"], q=n_bins_valeur, duplicates='drop')
df[f"{index}_bin"], bins_1h = pd.qcut(df[f"{index}"], q=n_bins_indice, labels=self.labels, retbins=True,
duplicates='drop')
df[f"{target}_bin"], bins_1d = pd.qcut(df[f"{target}"], q=n_bins_valeur, labels=self.labels, retbins=True,
duplicates='drop')
# Affichage formaté pour code Python
print(f"Bornes des quantiles pour {index} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]")
print(f"Bornes des quantiles pour {target} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]")
# Tableau croisé (compte)
tableau = pd.crosstab(df[f"{index}_bin"], df[f"{target}_bin"])
# Facultatif : en pourcentages
tableau_pct = tableau.div(tableau.sum(axis=1), axis=0) * 100
# Affichage
print("Répartition brute :")
print(tableau)
print("\nRépartition en % par ligne :")
print(tableau_pct.round(2))
# def calculateStats2(self, df, index, target):
# # Nombre de tranches (modifiable)
# n_bins_indice = 11
# n_bins_valeur = 11
#
# # Tranches dynamiques
# # df['indice_tranche'] = pd.qcut(df[f"{index}"], q=n_bins_indice, duplicates='drop')
# # df['valeur_tranche'] = pd.qcut(df[f"{target}"], q=n_bins_valeur, duplicates='drop')
#
# df[f"{index}_bin"], bins_1h = pd.qcut(df[f"{index}"], q=n_bins_indice, labels=self.labels, retbins=True,
# duplicates='drop')
# df[f"{target}_bin"], bins_1d = pd.qcut(df[f"{target}"], q=n_bins_valeur, labels=self.labels, retbins=True,
# duplicates='drop')
# # Affichage formaté pour code Python
# print(f"Bornes des quantiles pour {index} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]")
# print(f"Bornes des quantiles pour {target} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]")
#
# # Tableau croisé (compte)
# tableau = pd.crosstab(df[f"{index}_bin"], df[f"{target}_bin"])
#
# # Facultatif : en pourcentages
# tableau_pct = tableau.div(tableau.sum(axis=1), axis=0) * 100
#
# # Affichage
# print("Répartition brute :")
# print(tableau)
# print("\nRépartition en % par ligne :")
# print(tableau_pct.round(2))
def calculateStats(self, df, index, target):
# Nombre de tranches (modifiable)