# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np import os import json import csv from pandas import DataFrame from typing import Optional, Union, Tuple import math import logging from pathlib import Path # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from datetime import timezone, timedelta logger = logging.getLogger(__name__) # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" class Frictrade(IStrategy): startup_candle_count = 180 # ROI table: minimal_roi = { "0": 10 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = False trailing_stop = False trailing_stop_positive = 0.15 trailing_stop_positive_offset = 0.5 trailing_only_offset_is_reached = True # Buy hypers timeframe = '1m' max_open_trades = 5 max_amount = 40 parameters = {} # DCA config position_adjustment_enable = True columns_logged = False pairs = { pair: { "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'first_amount': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } trades = list() max_profit_pairs = {} btc_ath_history = [ {"date": "2011-06-09", "price_usd": 26.15, "note": "pic 2011 (early breakout)"}, {"date": "2013-11-29", "price_usd": 1132.00, "note": "bull run fin 2013"}, {"date": "2017-12-17", "price_usd": 19783.00, "note": "ATH décembre 2017 (crypto bubble)"}, {"date": "2020-12-31", "price_usd": 29001.72, "note": "fin 2020, nouveau record après accumulation)"}, {"date": "2021-11-10", "price_usd": 68742.00, "note": "record novembre 2021 (institutional demand)"}, {"date": "2024-03-05", "price_usd": 69000.00, "note": "nouveau pic début 2024 (source presse, valeur indicative)"}, {"date": "2025-07-11", "price_usd": 118755.00, "note": "pic juillet 2025 (valeur rapportée par la presse)"}, {"date": "2025-10-06", "price_usd": 126198.07, "note": "pic oct. 2025 (source agrégée, à vérifier selon l'exchange)"} ] def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0)) profit =trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['previous_profit'] = 0 self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None # else: # self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') # def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): # # dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) # last_candle = dataframe.iloc[-1].squeeze() # last_candle_1h = dataframe.iloc[-13].squeeze() # before_last_candle = dataframe.iloc[-2].squeeze() # before_last_candle_2 = dataframe.iloc[-3].squeeze() # before_last_candle_12 = dataframe.iloc[-13].squeeze() # # expected_profit = self.expectedProfit(pair, last_candle) # # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") # # max_touch_before = self.pairs[pair]['max_touch'] # self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) # self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) # self.pairs[pair]['current_trade'] = trade # # count_of_buys = trade.nr_of_successful_entries # # profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) # self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) # max_profit = last_candle['max5'] #self.pairs[pair]['max_profit'] # baisse = 0 # if profit > 0: # baisse = 1 - (profit / max_profit) # mx = max_profit / 5 # self.pairs[pair]['count_of_buys'] = count_of_buys # self.pairs[pair]['current_profit'] = profit # # dispo = round(self.wallets.get_available_stake_amount()) # hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 # days_since_first_buy = (current_time - trade.open_date_utc).days # hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 # minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 # # if minutes % 4 == 0: # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else " # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=round(profit, 2), # buys=count_of_buys, # stake=0 # ) # # if (last_candle['close'] > last_candle['mid']) or (last_candle['sma5_deriv1'] > 0): # return None # # pair_name = self.getShortName(pair) # # if profit > 0.003 * count_of_buys and baisse > 0.30: # self.pairs[pair]['force_sell'] = False # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # # self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '') def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if (self.getShortName(pair) == 'BTC'): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit return expected_profit def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_min':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}" #|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] self.printLog(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None rsi = '' rsi_pct = '' sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = '' last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) color = GREEN if profit > 0 else RED profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|{round(last_candle['rsi_1h'], 1) or '-' :>6}|{round(last_candle['rsi_1d'], 1) or '-' :>6}|" f"{round(last_candle['rtp_1h'] * 100, 0) or '-' :>6}|{round(last_candle['rtp_1d'] * 100, 0) or '-' :>6}|" ) def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] short_pair = self.getShortName(pair) self.path = f"user_data/plots/{short_pair}/" + ("valide/" if not self.dp.runmode.value in ('backtest') else '') heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2 dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean() dataframe['sma5_deriv1'] = 1000 * (dataframe['sma5'] - dataframe['sma5'].shift(1)) / dataframe['sma5'].shift(1) dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() dataframe['sma24_deriv1'] = 1000 * (dataframe['sma24'] - dataframe['sma24'].shift(1)) / dataframe['sma24'].shift(1) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() dataframe['sma60_deriv1'] = 1000 * (dataframe['sma60'] - dataframe['sma60'].shift(1)) / dataframe['sma60'].shift(1) # dataframe[f"sma5_inv"] = (dataframe[f"sma5"].shift(2) >= dataframe[f"sma5"].shift(1)) \ # & (dataframe[f"sma5"].shift(1) <= dataframe[f"sma5"]) dataframe["sma5_sqrt"] = ( np.sqrt(np.abs(dataframe["sma5"] - dataframe["sma5"].shift(1))) + np.sqrt(np.abs(dataframe["sma5"].shift(3) - dataframe["sma5"].shift(1))) ) dataframe["sma5_inv"] = ( (dataframe["sma5"].shift(2) >= dataframe["sma5"].shift(1)) & (dataframe["sma5"].shift(1) <= dataframe["sma5"]) & (dataframe["sma5_sqrt"] > 5) ) dataframe["percent"] = dataframe['mid'].pct_change() dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean() dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean() dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean() dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5) dataframe['min180'] = talib.MIN(dataframe['mid'], timeperiod=180) dataframe['max180'] = talib.MAX(dataframe['mid'], timeperiod=180) dataframe['pct180'] = ((dataframe["mid"] - dataframe['min180'] ) / (dataframe['max180'] - dataframe['min180'] )) dataframe = self.rsi_trend_probability(dataframe, short=60, long=360) # ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h') informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 # informative = self.populate1hIndicators(df=informative, metadata=metadata) informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14) informative = self.rsi_trend_probability(informative) # informative = self.calculateRegression(informative, 'mid', lookback=15) dataframe = merge_informative_pair(dataframe, informative, '1m', '1h', ffill=True) # ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1d') informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 informative['rsi'] = talib.RSI(informative['mid'], timeperiod=5) informative = self.rsi_trend_probability(informative) # informative = self.calculateRegression(informative, 'mid', lookback=15) dataframe = merge_informative_pair(dataframe, informative, '1m', '1d', ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price self.pairs[pair]['first_buy'] = buy.price self.pairs[pair]['first_amount'] = buy.price * buy.filled # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_buy'] = buy.price count = count + 1 amount += buy.price * buy.filled count_buys = count self.pairs[pair]['total_amount'] = amount dataframe['absolute_min'] = dataframe['mid'].rolling(1440, min_periods=1).min() dataframe['absolute_max'] = dataframe['mid'].rolling(1440, min_periods=1).max() # steps = (dataframe['absolute_max'] - dataframe['absolute_min']) / (dataframe['absolute_min'] * 0.01) # levels = [dataframe['absolute_min'] * (1 + i / 100) for i in range(1, steps + 1)] # # print(levels) # print(f"min={dataframe['absolute_min'].min()} max={dataframe['absolute_max'].max()}") for i in [0, 1, 2, 3]: dataframe[f"lvl_{i}_pct"] = dataframe['absolute_min'] * (1 + 0.01 * i) # # absolute_min = dataframe['absolute_min'].min() # absolute_max = dataframe['absolute_max'].max() # # # Écart total # diff = absolute_max - absolute_min # # # Nombre de lignes intermédiaires (1% steps) # steps = int((absolute_max - absolute_min) / (absolute_min * 0.01)) # # # Niveaux de prix à 1%, 2%, ..., steps% # levels = [absolute_min * (1 + i / 100) for i in range(1, steps + 1)] # levels = [lvl for lvl in levels if lvl < absolute_max] # évite le dernier niveau exact # # # ajout dans le DataFrame # for i, lvl in enumerate(levels, start=1): # dataframe[f"lvl_{i}_pct"] = lvl # # Indices correspondants # indices = [(dataframe['mid'] - lvl).abs().idxmin() for lvl in levels] return dataframe def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe.loc[ ( # (dataframe['sma5_inv'] == 1) ((dataframe['pct180'] < 0.5) | (dataframe['close'] < dataframe['sma60'])) & (dataframe['hapercent'] > 0) # & ( # (dataframe['percent3'] <= -0.003) # | (dataframe['percent12'] <= -0.003) # | (dataframe['percent24'] <= -0.003) # ) ), ['enter_long', 'enter_tag']] = (1, f"future") dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan) return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe # def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # # Calculer le minimum des 14 derniers jours # nb_pairs = len(self.dp.current_whitelist()) # # base_stake_amount = self.config.get('stake_amount') # # if True : #self.pairs[pair]['count_of_buys'] == 0: # factor = 1 #65 / min(65, last_candle['rsi_1d']) # # if last_candle['min_max_60'] > 0.04: # # factor = 2 # # adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor) # else: # adjusted_stake_amount = self.pairs[pair]['first_amount'] # # if self.pairs[pair]['count_of_buys'] == 0: # self.pairs[pair]['first_amount'] = adjusted_stake_amount # # return adjusted_stake_amount def adjust_stake_amount(self, pair: str, last_candle: DataFrame): ath = max(self.pairs[pair]['last_max'], self.get_last_ath_before_candle(last_candle)) ath_dist = 100 * (ath - last_candle["mid"]) / ath # ath_dist # 0 ==> 1 # 20 ==> 1.5 # 40 ==> 2 # 50 * (1 + (ath_dist / 40)) base_stake = self.config.get('stake_amount') * (1 + (ath_dist / 40)) # Calcule max/min 180 low180 = last_candle["min180"] high180 = last_candle["max180"] mult = 1 - ((last_candle["mid"] - low180) / (high180 - low180)) print(f"low={low180} mid={last_candle['mid']} high={high180} mult={mult} ath={ath} ath_dist={round(ath_dist, 2)}" ) # base_size = montant de base que tu veux utiliser (ex: stake_amount ou autre) base_size = base_stake # exemple fraction du portefeuille; adapte selon ton code # new stake proportionnel à mult new_stake = base_size * mult return new_stake def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # self.printLog("skip open orders") return None if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) last_lost = self.getLastLost(last_candle, pair) pct_first = 0 total_counts = sum( pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC') if self.pairs[pair]['first_buy']: pct_first = self.getPctFirstBuy(pair, last_candle) lim = 0.3 if (len(dataframe) < 1): # self.printLog("skip dataframe") return None # Dernier prix d'achat réel (pas le prix moyen) last_fill_price = self.pairs[trade.pair]['last_buy'] #trade.open_rate # remplacé juste après ↓ # if len(trade.orders) > 0: # # On cherche le dernier BUY exécuté # buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"] # if buy_orders: # last_fill_price = buy_orders[-1].price # baisse relative dca_threshold = 0.0025 * count_of_buys decline = (last_fill_price - current_rate) / last_fill_price increase = - decline # if decline >= self.dca_threshold: # # Exemple : on achète 50% du montant du dernier trade # last_amount = buy_orders[-1].amount if buy_orders else 0 # stake_amount = last_amount * current_rate * 0.5 # return stake_amount condition = last_candle['hapercent'] > 0 and last_candle['sma24_deriv1'] > 0 limit_buy = 40 if decline >= dca_threshold and condition: try: if self.pairs[pair]['has_gain'] and profit > 0: self.pairs[pair]['force_sell'] = True self.pairs[pair]['previous_profit'] = profit return None max_amount = self.config.get('stake_amount') * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle)) # print(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}") if stake_amount > 0: self.pairs[pair]['previous_profit'] = profit trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # self.printLog(df_filtered) return stake_amount return None except Exception as exception: self.printLog(exception) return None if current_profit > dca_threshold and (increase >= dca_threshold and self.wallets.get_available_stake_amount() > 0): try: self.pairs[pair]['previous_profit'] = profit stake_amount = max(20, min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))) if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '') self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type='Gain', profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None except Exception as exception: self.printLog(exception) return None return None def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_2 = dataframe.iloc[-3].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") # ----- 1) Charger les variables de trailing pour ce trade ----- max_price = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) if current_profit > 0: self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) # else: # self.pairs[pair]['max_profit'] = 0 max_profit = self.pairs[pair]['max_profit'] # if current_profit > 0: # print(f"profit={profit} max_profit={max_profit} current_profit={current_profit}") baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0 # ----- 2) Mise à jour du max_price ----- self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) # ----- 3) Calcul du profit max atteint ----- # profit_max = (max_price - trade.open_rate) / trade.open_rate current_trailing_stop_positive = self.trailing_stop_positive current_trailing_only_offset_is_reached = self.trailing_only_offset_is_reached current_trailing_stop_positive_offset = self.trailing_stop_positive_offset max_ = last_candle['max180'] min_ = last_candle['min180'] mid = last_candle['mid'] # éviter division par zéro position = (mid - min_) / (max_ - min_) zone = int(position * 3) # 0 à 2 if zone == 0: current_trailing_stop_positive = self.trailing_stop_positive current_trailing_stop_positive_offset = self.trailing_stop_positive_offset * 2 if minutes > 1440: current_trailing_only_offset_is_reached = False current_trailing_stop_positive_offset = self.trailing_stop_positive_offset # if zone == 1: # ----- 5) Calcul du trailing stop dynamique ----- # Exemple : offset=0.321 => stop à +24.8% trailing_stop = max_profit * (1.0 - current_trailing_stop_positive) baisse = 0 if max_profit: baisse = (max_profit - profit) / max_profit if minutes % 12 == 0: self.log_trade( last_candle=last_candle, date=current_time, action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else " dispo=dispo, pair=pair, rate=last_candle['close'], trade_type=f"{round(profit, 2)} {round(max_profit, 2)} {round(trailing_stop,2)} {minutes}", profit=round(profit, 2), buys=count_of_buys, stake=0 ) if last_candle['sma24_deriv1'] > 0 and minutes < 180 and baisse < 30: # and last_candle['sma5_deriv1'] > -0.15: return None # ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? ----- if current_trailing_only_offset_is_reached: # Max profit pas atteint ET perte < 2 * current_trailing_stop_positive if max_profit < min(2, current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain']))\ and (max_profit > current_trailing_stop_positive_offset): #2 * current_trailing_stop_positive: return None # ne pas activer le trailing encore # Sinon : trailing actif dès le début # ----- 6) Condition de vente ----- if 0 < profit <= trailing_stop and last_candle['mid'] < last_candle['sma5']: return f"stop_{count_of_buys}_{self.pairs[pair]['has_gain']}" return None def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1h') for pair in pairs] informative_pairs += [(pair, '1d') for pair in pairs] return informative_pairs def populate1hIndicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # --- WEEKLY LEVELS --- # semaine précédente = semaine ISO différente df["week"] = df.index.isocalendar().week df["year"] = df.index.year df["weekly_low"] = ( df.groupby(["year", "week"])["low"] .transform("min") .shift(1) # décalé -> pas regarder la semaine en cours ) df["weekly_high"] = ( df.groupby(["year", "week"])["high"] .transform("max") .shift(1) ) # Définition simple d'une zone de demande hebdo : # bas + 25% de la bougie => modifiable df["weekly_demand_zone_low"] = df["weekly_low"] df["weekly_demand_zone_high"] = df["weekly_low"] * 1.025 # --- MONTHLY LEVELS --- df["month"] = df.index.month df["monthly_low"] = ( df.groupby(["year", "month"])["low"] .transform("min") .shift(1) # mois précédent uniquement ) df["monthly_high"] = ( df.groupby(["year", "month"])["high"] .transform("max") .shift(1) ) df["monthly_demand_zone_low"] = df["monthly_low"] df["monthly_demand_zone_high"] = df["monthly_low"] * 1.03 return df # ----- SIGNALS SIMPLES POUR EXEMPLE ----- # def populate_buy_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # df["buy"] = 0 # # # Exemple : acheter si le prix tape la zone de demande hebdomadaire # df.loc[ # (df["close"] <= df["weekly_demand_zone_high"]) & # (df["close"] >= df["weekly_demand_zone_low"]), # "buy" # ] = 1 # # return df # # def populate_sell_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame: # df["sell"] = 0 # # # Exemple : vendre sur retour au weekly_high précédent # df.loc[df["close"] >= df["weekly_high"], "sell"] = 1 # # return df def rsi_trend_probability(self, dataframe, short=6, long=12): dataframe = dataframe.copy() dataframe['rsi_short'] = talib.RSI(dataframe['mid'], short) dataframe['rsi_long'] = talib.RSI(dataframe['mid'], long) dataframe['cross_soft'] = np.tanh((dataframe['rsi_short'] - dataframe['rsi_long']) / 7) dataframe['gap'] = (dataframe['rsi_short'] - dataframe['rsi_long']) / 100 dataframe['trend'] = (dataframe['rsi_long'] - 50) / 50 dataframe['rtp'] = ( 0.6 * dataframe['cross_soft'] + 0.25 * dataframe['gap'] + 0.15 * dataframe['trend'] ).clip(-1, 1) return dataframe import pandas as pd def to_utc_ts(self, x): return pd.to_datetime(x, utc=True) # suppose self.btc_ath_history exists (liste de dict) def get_last_ath_before_candle(self, last_candle): candle_date = self.to_utc_ts(last_candle['date']) # ou to_utc_ts(last_candle.name) best = None for a in self.btc_ath_history: #getattr(self, "btc_ath_history", []): ath_date = self.to_utc_ts(a["date"]) if ath_date <= candle_date: if best is None or ath_date > best[0]: best = (ath_date, a["price_usd"]) return best[1] if best is not None else None