Stratégie adaptée staking Pierrick

This commit is contained in:
Jérôme Delacotte
2025-05-20 21:12:46 +02:00
parent 2ae00ad976
commit f2b440a7d6
2 changed files with 440 additions and 14 deletions

199
BTC_Staking.py Normal file
View File

@@ -0,0 +1,199 @@
# pragma pylint: disable=missing-docstring, invalid-name, pointless-string-statement
# flake8: noqa: F401
# isort: skip_file
# --- Do not remove these libs ---
# noinspection PyUnresolvedReferences
import numpy as np
# noinspection PyUnresolvedReferences
import pandas as pd
# from future.backports.xmlrpc.client import DateTime
from pandas import DataFrame
from datetime import datetime
# noinspection PyUnresolvedReferences
from freqtrade.persistence import Trade
from typing import Optional, Tuple, Union
from datetime import timezone
import logging
# noinspection PyUnresolvedReferences
from freqtrade.strategy import (IStrategy, informative)
# --------------------------------
# Add your lib to import here
# noinspection PyUnresolvedReferences
import talib.abstract as ta
# noinspection PyUnresolvedReferences
import pandas_ta as pta
# noinspection PyUnresolvedReferences
from technical import qtpylib
logger = logging.getLogger(__name__)
class BTC_Staking(IStrategy):
INTERFACE_VERSION = 3
timeframe = '5m'
can_short: bool = False
stoploss = -0.99
trailing_stop = False
process_only_new_candles = True
use_exit_signal = True
exit_profit_only = False
ignore_roi_if_entry_signal = False
startup_candle_count: int = 50
# Position adjustment
position_adjustment_enable = True
max_entry_position_adjustment = 99
minimal_roi = {"0": 0.99}
# strategy parameters
staking_delay = 23 # hours
exit_profit = 0.011 # percent ratio
stakes = 20 # days
red_candle_pct = 1.10 # percent
# ------------------------------------------------------------------------------------------------------------------
def version(self) -> str:
return "250311"
# ------------------------------------------------------------------------------------------------------------------
def log(self, action='', stake=0.0, ctime: datetime = None, count_of_entries=0, price: float = 0, msg=''):
free = self.wallets.get_free(self.stake_currency)
remain = free - stake
full = self.wallets.get_total_stake_amount()
formatted_date = ctime.strftime('%Y-%m-%d %H:%M')
# logger.info(f" | {formatted_date:16} | {count_of_entries:2} | {stake:6.2f} | {remain:7.2f} | {msg} |")
print(
f"| {action:8} | {formatted_date:16} | {count_of_entries + 1:2} | {stake:6.2f} | {remain:7.2f} | {full:7.2f} | {price:9.2f} | {msg}")
# ------------------------------------------------------------------------------------------------------------------
def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float,
**kwargs):
if trade.has_open_orders:
return None
count_of_entries = trade.nr_of_successful_entries
if current_profit >= self.exit_profit:
self.log(
action="🟥 Sell",
stake=0,
ctime=current_time,
count_of_entries=count_of_entries,
price=current_rate,
msg='take_profit_' + str(count_of_entries + 1)
)
return 'take_profit_' + str(count_of_entries)
return None
# ------------------------------------------------------------------------------------------------------------------
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: Optional[float], max_stake: float,
leverage: float, entry_tag: Optional[str], side: str, **kwargs) -> float:
# This is called when placing the initial order (opening trade)
stake = self.calculate_stake()
if min_stake < stake < max_stake:
self.log(
action="🟩 Buy",
stake=stake,
ctime=current_time,
count_of_entries=0,
price=current_rate,
msg=''
)
return stake
return 0
# ------------------------------------------------------------------------------------------------------------------
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs
) -> Union[Optional[float], Tuple[Optional[float], Optional[str]]]:
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
return None
# prépare les données
df, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last = df.iloc[-1].squeeze()
count_of_entries = trade.nr_of_successful_entries
current_time = current_time.astimezone(timezone.utc)
seconds_since_filled = (current_time - trade.date_last_filled_utc).total_seconds()
# calcul de la nouvelle mise
stake = self.calculate_stake()
# déclenche un achat si bougie rouge importante
pct = (last['close'] - last['open']) / (last['open']) * 100
if (
stake
and pct <= -self.red_candle_pct
and min_stake < stake < max_stake
and seconds_since_filled > (60 * 5)
# and seconds_since_filled > (1 * 3600)
# and count_of_entries < 10
):
msg = f"🔻 {trade.pair} Price drop"
self.log(
action="🟧 Adjust",
stake=stake,
ctime=current_time,
count_of_entries=count_of_entries,
price=current_rate,
msg=msg
)
self.dp.send_msg(msg)
return stake
# déclenche un achat en conditions d'achat standard
if (
stake
and last['close'] < last['sma20']
and last['close'] < last['open']
and min_stake < stake < max_stake
and seconds_since_filled > self.staking_delay * 3600
):
self.log(
action="🟨 Adjust",
stake=stake,
ctime=current_time,
count_of_entries=count_of_entries,
price=current_rate,
msg=''
)
return stake
return None
# ------------------------------------------------------------------------------------------------------------------
def calculate_stake(self) -> float:
full = self.wallets.get_total_stake_amount()
stake = full / self.stakes
return stake
# ------------------------------------------------------------------------------------------------------------------
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe['sma20'] = ta.SMA(dataframe, timeperiod=20)
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
return dataframe
# ------------------------------------------------------------------------------------------------------------------
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[
(dataframe['volume'] > 0)
# un petit gain avec ça & (dataframe['percent'] < 0)
, 'enter_long'] = 1
if self.dp.runmode.value in ('backtest'):
today = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
dataframe.to_feather(f"user_data/data/binance/{today}-{metadata['pair'].replace('/', '_')}_df.feather")
return dataframe
# ------------------------------------------------------------------------------------------------------------------
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
return dataframe

View File

@@ -180,7 +180,8 @@ class Zeus_8_3_2_B_4_2(IStrategy):
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0
'max_profit': 0,
'last_palier_index': -1
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
@@ -343,6 +344,8 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# Extraction de la matrice numérique
smooth_1d_sma_2_diff_1d_numeric_matrice = smooth_1d_sma_2_diff_1d_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values
paliers = {}
# =========================================================================
# Parameters hyperopt
@@ -373,7 +376,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_palier_index'] = 0
dispo = round(self.wallets.get_available_stake_amount())
self.printLineLog()
@@ -428,7 +431,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['last_palier_index'] = -1
return (allow_to_sell) | (exit_reason == 'force_exit')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
@@ -446,6 +449,11 @@ class Zeus_8_3_2_B_4_2(IStrategy):
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
count_of_buys = trade.nr_of_successful_entries
if current_profit >= 0.011: #self.exit_profit:
return 'take_profit_' + str(count_of_buys)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_1h = dataframe.iloc[-13].squeeze()
@@ -572,11 +580,12 @@ class Zeus_8_3_2_B_4_2(IStrategy):
self.printLog(
f"| {date:<16} | {action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"| {profit or '-':>8} | {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {int(self.pairs[pair]['last_max']) or '-':>7} |{buys or '-':>4}|{stake or '-':>7}"
f"| {int(self.pairs[pair]['last_max']) or '-':>7} |{buys or '-':>2}-{self.pairs[pair]['last_palier_index'] or '-':>2}|{stake or '-':>7}"
f"|{last_candle['tendency'] or '-':>3}|" #{last_candle['tendency_1h'] or '-':>3}|{last_candle['tendency_1d'] or '-':>3}"
# f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_1h_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|"
# f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_1h_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|"
# f"{round(val144, 1) or '-' :>6}|{round(val1h, 1) or '-':>6}|"
f"{round(last_candle['sma24_deriv1_1h'], 4) or '-' :>6}|{round(last_candle['sma5_deriv1_1d'], 4) or '-' :>6}"
)
def printLineLog(self):
@@ -690,6 +699,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma24'] = talib.SMA(informative, timeperiod=24)
self.calculeDerivees(informative, 'sma24')
# self.calculateDownAndUp(informative, limit=0.0012)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
@@ -709,8 +719,8 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# informative['rsi'] = talib.RSI(informative['close']) #, timeperiod=7)
# self.calculeDerivees(informative, 'rsi')
#
# informative['sma5'] = talib.SMA(informative, timeperiod=5)
# self.calculeDerivees(informative, 'sma5')
informative['sma5'] = talib.SMA(informative, timeperiod=5)
self.calculeDerivees(informative, 'sma5', factor_1=10, factor_2=1)
# informative['close_smooth'] = self.conditional_smoothing(informative['mid'].dropna(), threshold=0.0015).dropna()
# informative['smooth'], informative['deriv1'], informative['deriv2'] = self.smooth_and_derivatives(informative['close_smooth'])
@@ -814,9 +824,9 @@ class Zeus_8_3_2_B_4_2(IStrategy):
return dataframe
def calculeDerivees(self, dataframe, indic):
dataframe[f"{indic}_deriv1"] = 100 * dataframe[f"{indic}"].diff() / dataframe[f"{indic}"]
dataframe[f"{indic}_deriv2"] = 10 * dataframe[f"{indic}_deriv1"].diff()
def calculeDerivees(self, dataframe, indic, factor_1=100, factor_2=10):
dataframe[f"{indic}_deriv1"] = factor_1 * dataframe[f"{indic}"].diff() / dataframe[f"{indic}"]
dataframe[f"{indic}_deriv2"] = factor_2 * dataframe[f"{indic}_deriv1"].diff()
def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['hapercent'] <= limit
@@ -884,6 +894,9 @@ class Zeus_8_3_2_B_4_2(IStrategy):
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
self.paliers = self.get_dca_stakes()
print(self.paliers)
if self.dp.runmode.value in ('backtest'):
today = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
dataframe.to_feather(f"user_data/data/binance/{today}-{metadata['pair'].replace('/', '_')}_df.feather")
@@ -1011,6 +1024,71 @@ class Zeus_8_3_2_B_4_2(IStrategy):
# print(f"skip pair {pair}")
return None
# déclenche un achat si bougie rouge importante
stake_amount = self.config.get('stake_amount', 100)
stake_amount = min(stake_amount, self.wallets.get_available_stake_amount())
current_time = current_time.astimezone(timezone.utc)
seconds_since_filled = (current_time - trade.date_last_filled_utc).total_seconds()
pct = (last_candle['close'] - last_candle['open']) / (last_candle['open']) * 100
if (
stake_amount
and pct <= - 1.10 #self.red_candle_pct
and min_stake < stake_amount < max_stake
and seconds_since_filled > (60 * 5)
# and (last_candle["sma24_deriv1_1h"] > - 0.02)
# and seconds_since_filled > (1 * 3600)
# and count_of_entries < 10
):
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Adjust 1",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
# déclenche un achat en conditions d'achat standard
if (
stake_amount
and last_candle['close'] < last_candle['sma20']
and last_candle['close'] < last_candle['open']
and min_stake < stake_amount < max_stake
and (last_candle["sma24_deriv1_1h"] > - 0.02)
and seconds_since_filled > 23 * 3600 #self.staking_delay * 3600
):
stake_amount = stake_amount * seconds_since_filled / (23 * 3600)
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Adjust 2",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
@@ -1028,23 +1106,44 @@ class Zeus_8_3_2_B_4_2(IStrategy):
else:
pct_max = - pct
lim = - pct - (count_of_buys * 0.001)
# print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_max={pct_max:.3f} lim={lim:.3f} rsi_deriv1_1f={last_candle['rsi_deriv1_1h']}")
index = self.get_palier_index(pct_first)
if index is None:
return None
lim, stake_amount = self.paliers[index] #- pct - (count_of_buys * 0.001)
# self.get_active_stake()
# val144 = self.getProbaHausse144(last_candle)
# val1h = self.getProbaHausse1h(last_candle)
# val = self.getProbaHausse144(last_candle)
# buy = False
# previous = 0
# # current_profit=-0.001998 count_of_buys=1 pct_first=0.000 pct_palier=-0.629 pct_max=-0.002 lim=0.000
#
# for pct_palier, stake_amount in self.paliers:
# if abs(pct_palier) > abs(pct_first):
# lim = pct_palier
# break
# previous = pct_palier
# print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_first={pct_first:.3f} pct_palier={pct_palier:.3f} pct_max={pct_max:.3f} lim={lim:.3f} ")
# if (days_since_open > count_of_buys) & (0 < count_of_buys <= max_buys) & (current_rate <= limit) & (last_candle['enter_long'] == 1):
limit_buy = 20
if (count_of_buys < limit_buy) \
and (last_candle['enter_long'] == 1) \
and (pct_max < lim): # and val > self.buy_val_adjust.value and last_candle['mid_smooth_deriv1_1d'] > - 1):
and (last_candle["sma24_deriv1_1h"] > - 0.02) \
and (last_candle["sma5_deriv1_1d"] > - 0.02 or count_of_buys <= 5)\
and (self.pairs[trade.pair]['last_palier_index'] < index): # and val > self.buy_val_adjust.value and last_candle['mid_smooth_deriv1_1d'] > - 1):
try:
print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_first={pct_first:.3f} pct_max={pct_max:.3f} lim={lim:.3f} index={index}")
self.pairs[trade.pair]['last_palier_index'] = index
max_amount = self.config.get('stake_amount', 100) * 2.5
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / pct) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys])
stake_amount = min(stake_amount, self.wallets.get_available_stake_amount())
# min(min(max_amount, self.wallets.get_available_stake_amount()),
# self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / pct) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys])
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.log_trade(
@@ -1059,6 +1158,7 @@ class Zeus_8_3_2_B_4_2(IStrategy):
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
@@ -1413,3 +1513,130 @@ class Zeus_8_3_2_B_4_2(IStrategy):
result.append(poly(window - 1))
return pd.Series(result, index=series.index)
def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15,
max_stake: float = 1000.0) -> float:
"""
Calcule la mise à allouer en fonction du drawdown.
:param pct: Drawdown en pourcentage (ex: -0.12 pour -12%)
:param base_stake: Mise de base (niveau 0)
:param step: Espacement entre paliers (ex: tous les -4%)
:param growth: Facteur de croissance par palier (ex: 1.15 pour +15%)
:param max_stake: Mise maximale à ne pas dépasser
:return: Montant à miser
"""
if pct >= 0:
return base_stake
level = int(abs(pct) / step)
stake = base_stake * (growth ** level)
return min(stake, max_stake)
def compute_adaptive_paliers(self, max_drawdown: float = 0.65, first_steps: list[float] = [0.01, 0.01, 0.015, 0.02], growth: float = 1.2) -> list[float]:
"""
Génère une liste de drawdowns négatifs avec des paliers plus rapprochés au début.
:param max_drawdown: Drawdown max (ex: 0.65 pour -65%)
:param first_steps: Liste des premiers paliers fixes en % (ex: [0.01, 0.01, 0.015])
:param growth: Facteur multiplicatif pour espacer les paliers suivants
:return: Liste de drawdowns négatifs (croissants)
"""
paliers = []
cumulated = 0.0
# Étapes initiales rapprochées
for step in first_steps:
cumulated += step
paliers.append(round(-cumulated, 4))
# Étapes suivantes plus espacées
step = first_steps[-1]
while cumulated < max_drawdown:
step *= growth
cumulated += step
if cumulated >= max_drawdown:
break
paliers.append(round(-cumulated, 4))
return paliers
def get_dca_stakes(self,
max_drawdown: float = 0.65,
base_stake: float = 100.0,
first_steps: list[float] = [0.01, 0.01, 0.015, 0.015],
growth: float = 1.2,
stake_growth: float = 1.15
) -> list[tuple[float, float]]:
"""
Génère les paliers de drawdown et leurs stakes associés.
:param max_drawdown: Maximum drawdown (ex: 0.65 pour -65%)
:param base_stake: Mise initiale
:param first_steps: Paliers de départ (plus resserrés)
:param growth: Multiplicateur d'espacement des paliers
:param stake_growth: Croissance multiplicative des mises
:return: Liste de tuples (palier_pct, stake)
[(-0.01, 100.0), (-0.02, 115.0), (-0.035, 132.25), (-0.05, 152.09), (-0.068, 174.9),
(-0.0896, 201.14), (-0.1155, 231.31), (-0.1466, 266.0), (-0.1839, 305.9), (-0.2287, 351.79),
(-0.2825, 404.56), (-0.347, 465.24), (-0.4244, 535.03), (-0.5173, 615.28), (-0.6287, 707.57)]
"""
paliers = [
(-0.01, 100.0), (-0.02, 115.0), (-0.035, 130), (-0.05, 150), (-0.07, 150),
(-0.10, 150), (-0.15, 150), (-0.20, 150), (-0.25, 150),
(-0.30, 200), (-0.40, 200),
(-0.50, 300), (-0.60, 400), (-0.70, 500), (-0.80, 1000)
]
# cumulated = 0.0
# stake = base_stake
#
# # Étapes initiales
# for step in first_steps:
# cumulated += step
# paliers.append((round(-cumulated, 4), round(stake, 2)))
# stake *= stake_growth
#
# # Étapes suivantes
# step = first_steps[-1]
# while cumulated < max_drawdown:
# step *= growth
# cumulated += step
# if cumulated >= max_drawdown:
# break
# paliers.append((round(-cumulated, 4), round(stake, 2)))
# stake *= stake_growth
return paliers
def get_active_stake(self, pct: float) -> float:
"""
Renvoie la mise correspondant au drawdown `pct`.
:param pct: drawdown courant (négatif, ex: -0.043)
:param paliers: liste de tuples (drawdown, stake)
:return: stake correspondant
"""
abs_pct = abs(pct)
stake = self.paliers[0][1] # stake par défaut
for palier, s in self.paliers:
if abs_pct >= abs(palier):
stake = s
else:
break
return stake
def get_palier_index(self, pct):
"""
Retourne l'index du palier franchi pour un pourcentage de baisse donné (pct).
On cherche le palier le plus profond atteint (dernier franchi).
"""
for i in reversed(range(len(self.paliers))):
seuil, _ = self.paliers[i]
#print(f"pct={pct} seuil={seuil}")
if pct <= seuil:
# print(pct)
return i
return None # Aucun palier atteint