# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np from pandas import DataFrame from typing import Optional, Union, Tuple from typing import List import logging import configparser from technical import pivots_points # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib import requests from datetime import timezone, timedelta from scipy.signal import savgol_filter from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator from collections import Counter logger = logging.getLogger(__name__) from tabulate import tabulate # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df class Zeus_8_1d(IStrategy): levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] startup_candle_count = 12 * 24 * 2 # ROI table: minimal_roi = { "0": 10 } stakes = 40 # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = True # Buy hypers timeframe = '5m' max_open_trades = 5 max_amount = 40 # DCA config position_adjustment_enable = True plot_config = { "main_plot": { # "close_mean_centered": { # "color": "red" # }, "mid_smooth_5": { "color": "blue" }, "mid_smooth_12": { "color": "green" }, "mid_smooth_24": { "color": "yellow" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", } }, "subplots": { "Rsi": { "max_rsi_12": { "color": "blue" }, }, "Deriv1": { "mid_smooth_5_deriv1": { "color": "blue" }, "mid_smooth_12_deriv1": { "color": "green" }, "mid_smooth_24_deriv1": { "color": "yellow" }, }, "Deriv2": { "mid_smooth_5_deriv2": { "color": "blue" }, "mid_smooth_12_deriv2": { "color": "green" }, "mid_smooth_24_deriv2": { "color": "yellow" }, }, "Down": { "percage_upperband": { "color": "green" }, "percage_up": { "color": "blue" } } # "Diff": { # "sma10_deriv1": { # "color": "#74effc" # } # }, } } columns_logged = False pairs = { pair: { "first_buy": 0, "last_buy": 0.0, "first_amount": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, "last_candle": {}, "last_trade": None, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'last_palier_index': -1, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 # factors = [1, 1.1, 1.25, 1.5, 2.0, 3] # thresholds = [2, 5, 10, 20, 30, 50] factors = [0.5, 0.75, 1, 1.25, 1.5, 2] thresholds = [0, 2, 5, 10, 30, 45] trades = list() max_profit_pairs = {} # ========================================================================= # Parameters hyperopt mise_factor_buy = DecimalParameter(0.01, 0.2, default=0.05, decimals=2, space='buy', optimize=True, load=True) # Récupération des labels ordonnés # labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] # index_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] # ordered_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] index_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] ordered_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] label_to_index = {label: i for i, label in enumerate(ordered_labels)} # ========================================================================= # paliers dérivées jour sma5 sma5_deriv1 = [-1.1726, -0.2131, -0.1012, -0.0330, 0.0169, 0.0815, 0.2000, 4.0335] sma5_deriv2 = [-1.9190, -0.1388, -0.0644, -0.0202, 0.0209, 0.0646, 0.1377, 4.2987] sma5_derive1_2_matrice = { 'B3': [8.6, 10.8, 34.6, 35.0, 58.8, 61.9, 91.2], 'B2': [0.0, 12.5, 9.1, 57.1, 63.3, 79.3, 89.5], 'B1': [6.1, 12.5, 22.0, 46.8, 61.5, 70.0, 100.0], 'N0': [0.0, 10.7, 37.0, 43.5, 75.0, 75.9, 100.0], 'H1': [0.0, 18.5, 32.4, 35.9, 76.8, 82.9, 92.0], 'H2': [0.0, 21.9, 16.0, 39.5, 69.7, 83.3, 100.0], 'H3': [9.5, 29.2, 41.2, 57.9, 53.8, 86.8, 92.3], } sma5_derive1_2_matrice_df = pd.DataFrame(sma5_derive1_2_matrice, index=index_labels) # Extraction de la matrice numérique sma5_derive1_2_numeric_matrice = sma5_derive1_2_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values # paliers = {} indicateur_achat_vente = 'mid_smooth_12' should_enter_trade_count = 0 def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() # val = self.getProbaHausse144(last_candle) # allow_to_buy = True #(not self.stop_all) #& (not self.all_down) allow_to_buy = not self.pairs[pair]['stop'] # and val > self.buy_val.value #not last_candle['tendency'] in ('B-', 'B--') # (rate <= float(limit)) | (entry_tag == 'force_entry') force = self.pairs[pair]['force_buy'] if self.pairs[pair]['force_buy']: self.pairs[pair]['force_buy'] = False allow_to_buy = True else: if not self.should_enter_trade(pair, last_candle, current_time): allow_to_buy = False if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_palier_index'] = -1 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() force = self.pairs[pair]['force_sell'] allow_to_sell = (last_candle['percent'] < 0) #or force minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_trade'] = trade self.pairs[pair]['last_candle'] = last_candle self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(trade.calc_profit(rate, amount), 2) ) self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['last_palier_index'] = -1 self.pairs[pair]['last_trade'] = trade # self.pairs[pair]['current_trade'] = None return (allow_to_sell) | (exit_reason == 'force_exit') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_2 = dataframe.iloc[-3].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) # self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = round(current_profit * trade.stake_amount, 1) self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) max_profit = self.pairs[pair]['max_profit'] baisse = 0 if profit > 0: baisse = 100 * abs(max_profit - profit) / max_profit # print(f"{current_time} max_profit={self.pairs[pair]['max_profit']} profit={profit} baisse={round(baisse,2)}") mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 if True or (hours % 4 == 0): self.log_trade( last_candle=last_candle, date=current_time, action="🔴 CURRENT" if self.pairs[pair]['stop'] else "🟢 CURRENT", dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=profit, buys='', stake=0 ) pair_name = self.getShortName(pair) # if baisse > 10 and max_profit > 5 and count_of_buys == 1: # self.pairs[pair]['force_sell'] = False # return 'Bss_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) + '_' + str(round(baisse, 2)) if last_candle['mid_smooth_5_deriv1'] <= 0.1 \ and before_last_candle['mid_smooth_5_deriv2'] > 0 \ and last_candle['mid_smooth_5_deriv2'] < 0 \ and last_candle['mid_smooth_24_deriv1'] < 0.25 \ and profit > expected_profit \ and last_candle['max_rsi_12'] > 70: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 5) return 'RSI_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) + '_' + str(round(baisse, 2)) if last_candle['mid_smooth_12_deriv1'] <= -0.1 and profit > expected_profit: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 5) return 'Drv3_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) + '_' + str(round(baisse, 2)) self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '') def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float: if pct <= thresholds[0]: return factors[0] if pct >= thresholds[-1]: return factors[-1] for i in range(1, len(thresholds)): if pct <= thresholds[i]: # interpolation linéaire entre thresholds[i-1] et thresholds[i] return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / ( thresholds[i] - thresholds[i - 1]) # Juste au cas où (devrait jamais arriver) return factors[-1] def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |" f"Tdc|{'val':>6}| RSI |s201d|s5_1d|s5_2d|s51h|s52h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'last_trade', 'last_palier_index', # 'current_trade', 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] print(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' # round(last_candle['max12'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = self.getDistMax(last_candle, pair) val = self.getProbaHausseSma5d(last_candle) pct60 = round(100 * self.getPct60D(pair, last_candle), 2) color = GREEN if profit > 0 else RED color_sma20 = GREEN if last_candle['sma20_deriv1'] > 0 else RED color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1'] > 0 else RED color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2'] > 0 else RED color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(self.pairs[pair]['last_max'],3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(self.pairs[pair]['last_min'], 3) profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # tdc last_candle['tendency_12'] self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" f"|{ last_candle['tendency_12'] or '-':>3}|" f"{round(val, 1) or '-' :>6}|" f"{round(last_candle['rsi'], 0):>7}|{color_sma20}{round(last_candle['sma20_deriv1'], 2):>5}{RESET}" f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2'], 2):>5}{RESET}" f"|{color_sma5_1h}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}" # f"|{last_candle['min60']}|{last_candle['max60']}" ) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getDistMax(self, last_candle, pair): mx = last_candle['max12'] dist_max = round(100 * (mx - last_candle['close']) / mx, 0) return dist_max def printLineLog(self): self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"{'-' * 3}" # "+{'-' * 3}+{'-' * 3} f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def add_tendency_column(self, dataframe: pd.DataFrame, name, suffixe='') -> pd.DataFrame: def tag_by_derivatives(row): d1 = row[f"{name}{suffixe}_deriv1"] d2 = row[f"{name}{suffixe}_deriv2"] d1_lim_inf = -0.01 d1_lim_sup = 0.01 if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup: return 'P' # Palier if d1 == 0.0: return 'DH' if d2 > 0 else 'DB' # Depart Hausse / Départ Baisse if d1 > d1_lim_sup: return 'H++' if d2 > 0 else 'H+' # Acceleration Hausse / Ralentissement Hausse if d1 < d1_lim_inf: return 'B--' if d2 < 0 else 'B-' # Accéleration Baisse / Ralentissement Baisse return 'Mid' dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1) return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['hapercent3'] = (dataframe['haclose'] - dataframe['haopen'].shift(3)) / dataframe['haclose'].shift(3) dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5) self.calculeDerivees(dataframe, 'sma5', horizon=10) dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10) self.calculeDerivees(dataframe, 'sma10', horizon=10) dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20) self.calculeDerivees(dataframe, 'sma20', horizon=20) dataframe['sma60'] = talib.SMA(dataframe, timeperiod=60) self.calculeDerivees(dataframe, 'sma60', horizon=60) dataframe['sma144'] = talib.SMA(dataframe, timeperiod=144) self.calculeDerivees(dataframe, 'sma144') dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"] dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3) dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5) dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12) dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3") # dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_6") dataframe["mid_re_smooth_3"] = self.conditional_smoothing(dataframe['mid_smooth_3'].dropna(), threshold=0.0005).dropna() self.calculeDerivees(dataframe, "mid_re_smooth_3", horizon=3) dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12") dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", factor_1=1000, factor_2=10) # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['min_rsi_12'] = talib.MIN(dataframe['rsi'], timeperiod=12) self.calculeDerivees(dataframe, 'rsi', horizon=5) dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12) dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12) dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60) dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60) # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = ((dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_upperband"]) # Normalization dataframe['bb_upperband5'] = dataframe['bb_upperband'].rolling(window=5).mean() dataframe['bb_lowerband5'] = dataframe['bb_lowerband'].rolling(window=5).mean() dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5") dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12") dataframe['percage_up'] = dataframe['high'] > dataframe['bb_upperband'] dataframe['percage_upperband'] = dataframe['percage_up'].astype(int) * ( dataframe['percage_up'].groupby((dataframe['percage_up'] != dataframe['percage_up'].shift()).cumsum()).cumcount() + 1) # informative['futur_percent_3d'] = 100 * (informative['close'].shift(-3) - informative['close']) / informative['close'] # # self.calculateProbabilite2Index(informative, ['futur_percent'], 'rsi_deriv1', 'rsi') # # self.calculateProbabilite2Index(dataframe, ['futur_percent_3d'], 'rsi_deriv1', 'sma5') # informative['close_smooth'] = self.conditional_smoothing(informative['mid'].dropna(), threshold=0.0015).dropna() # informative['smooth'], informative['deriv1'], informative['deriv2'] = self.smooth_and_derivatives(informative['close_smooth']) # informative['deriv1'] = 100 * informative['deriv1'] / informative['mid'] # informative['deriv2'] = 1000 * informative['deriv2'] / informative['mid'] # poly_func, x_future, y_future, count = self.polynomial_forecast(informative['sma5_deriv1'], window=24, degree=4) # informative['futur_percent_3'] = 100 * ((informative['sma5'].shift(-1) - informative['sma5']) / informative['sma5']) # futur_cols = ['futur_percent_3'] # indic_1 = 'sma5_deriv1' # indic_2 = 'sma5_deriv2' # self.calculateProbabilite2Index(informative, futur_cols, indic_1, indic_2) # self.calculePlateaux(informative, 14, 0.01) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 # dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01 # dataframe['limit'] = dataframe['close'] count_buys = 0 if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price self.pairs[pair]['first_buy'] = buy.price # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_buy'] = buy.price count = count + 1 amount += buy.price * buy.filled # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count # dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) # dataframe['amount'] = amount # Compter les baisses / hausses consécutives self.calculateDownAndUp(dataframe, limit=0.0001) horizon_h = 12 dataframe['close_smooth'] = self.conditional_smoothing(dataframe['mid'].rolling(3).mean().dropna(), threshold=0.001) dataframe['smooth'], dataframe['deriv1'], dataframe['deriv2'] = self.smooth_and_derivatives( dataframe['close_smooth']) dataframe['deriv1'] = 100 * dataframe['deriv1'] / dataframe['mid'] dataframe['deriv2'] = 100 * dataframe['deriv2'] / dataframe['mid'] # Regarde dans le futur # n = 10 # dataframe['close_mean_centered'] = dataframe['close'].rolling(window=2 * n + 1, center=True, min_periods=1).mean() # =============================== # Lissage des valeurs Journalières horizon_d = 12 * 5 * 24 dataframe['ema_volume'] = 20 * (dataframe['volume'] * dataframe['percent']) / ( abs(dataframe['volume'].shift(1)) + abs(dataframe['volume'].shift(2))) self.calculeDerivees(dataframe, 'ema_volume', factor_1=10, factor_2=1, horizon=14) return dataframe def calculeDerivees(self, dataframe, indic, factor_1=100, factor_2=10, horizon=5): dataframe[f"{indic}_deriv1"] = (factor_1 * dataframe[f"{indic}"].diff() / dataframe[f"{indic}"]).rolling(horizon).mean() dataframe[f"{indic}_deriv2"] = (factor_2 * dataframe[f"{indic}_deriv1"].diff()).rolling(horizon).mean() def calculateDownAndUp(self, dataframe, limit=0.0001): dataframe['down'] = dataframe['mid_smooth_12_deriv1'] < limit # dataframe['hapercent'] <= limit dataframe['up'] = dataframe['mid_smooth_12_deriv1'] > limit # dataframe['hapercent'] >= limit dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') def calculateDerivation(self, dataframe, window=12, suffixe='', factor_1=100, factor_2=10): dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 # 1. Calcul du lissage par moyenne mobile médiane dataframe[f"mid_smooth{suffixe}"] = dataframe['haclose'].rolling(window=window).mean() # 2. Dérivée première = différence entre deux bougies successives dataframe[f"mid_smooth{suffixe}_deriv1"] = round( factor_1 * dataframe[f"mid_smooth{suffixe}"].rolling(window=3).mean().diff() / dataframe[ f"mid_smooth{suffixe}"], 4) # 3. Dérivée seconde = différence de la dérivée première dataframe[f"mid_smooth{suffixe}_deriv2"] = round( factor_2 * dataframe[f"mid_smooth{suffixe}_deriv1"].rolling(window=3).mean().diff(), 4) dataframe = self.add_tendency_column(dataframe, "mid_smooth", suffixe) return dataframe def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] # self.getOpenTrades() # expected_profit = self.expectedProfit(pair, dataframe.iloc[-1]) # self.getBinanceOrderBook(pair, dataframe) # Calcul de la pente (différence entre deux bougies consécutives) dataframe['bb_ub_slope'] = dataframe['bb_upperband'].diff() # Détection d'une inversion (changement de signe de la pente) # inversion = ( # (dataframe['bb_ub_slope'].shift(1).rolling(5).apply( # lambda x: any(x[i] > 0 and x[i + 1] < 0 for i in range(len(x) - 1)))) == 1 # ) # On regarde si la bande supérieure a atteint un maximum il y a k bougies # lookback = 5 # inversion = (dataframe['bb_upperband'] == dataframe['bb_upperband'].rolling(lookback).max()) # pente de la bb_upperband dataframe['bb_ub_slope'] = dataframe['bb_upperband5'].diff() # évènement "inversion vers le bas" (pente passe de >0 à <=0) sur chaque bougie cross_down = (dataframe['bb_ub_slope'].shift(1) > 0) & (dataframe['bb_ub_slope'] <= 0) dataframe['bb_cross_down'] = 10000 * cross_down * dataframe['bb_width'] \ * (dataframe['bb_lowerband'] - dataframe['bb_lowerband'].shift(1)) / dataframe[ 'bb_lowerband'] # vrai si AU MOINS une inversion a eu lieu dans les 5 bougies *précédentes* (on exclut l'actuelle) inversion_last5 = cross_down.shift(1).rolling(5, min_periods=1).max().astype(bool) dataframe['inversion_last5'] = inversion_last5 N = 24 # nombre minimum de bougies avant inversion rise_threshold = 1.0 # % de hausse à ne pas dépasser # # Calcul de la hausse minimale avant inversion # def compute_rise(idx): # if idx < N: # return 0 # low_before = dataframe['close'].iloc[idx - N:idx].min() # min des N bougies avant inversion # return (dataframe['close'].iloc[idx] / low_before - 1) * 100 # # rise = [compute_rise(i) for i in range(len(dataframe))] # dataframe['rise_before_inversion'] = rise # # # Filtre : inversion sans forte hausse avant # valid_inversion = inversion_last5 & (dataframe['rise_before_inversion'] <= rise_threshold) # dataframe.loc[ # ( # (dataframe['percent'] > 0) # & (dataframe['mid_smooth_12_deriv1'] >= dataframe['mid_smooth_12_deriv1'].shift(1)) # ), ['enter_long', 'enter_tag']] = (1, 'down') dataframe.loc[ ( # (valid_inversion & inversion_last5 ) # (dataframe['mid_smooth_12'].shift(2) > dataframe['mid_smooth_12'].shift(1)) # (dataframe['mid_smooth_24_deriv1'].shift(1) <= 0) (dataframe['mid_smooth_5_deriv1'] >= 0.0) & (dataframe['mid_smooth_5_deriv2'] > 0) # & (dataframe['hapercent'] > 0) # & (dataframe['max_rsi_12'] < 70) & (dataframe['open'] <= dataframe['bb_middleband']) ), ['enter_long', 'enter_tag']] = (1, 'smth_12') dataframe.loc[ ( (dataframe['min_rsi_12'] < 20) & (dataframe['hapercent'] > 0) ), ['enter_long', 'enter_tag']] = (1, 'min_rsi_12') dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) return dataframe def calculateProbabilite2Index(self, df, futur_cols, indic_1, indic_2): # # Définition des tranches pour les dérivées # bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf] # labels = ['forte baisse', 'légère baisse', 'neutre', 'légère hausse', 'forte hausse'] # # # Ajout des colonnes bin (catégorisation) # df[f"{indic_1}_bin"] = pd.cut(df['mid_smooth_1h_deriv1'], bins=bins_deriv, labels=labels) # df[f"{indic_2}_bin"] = pd.cut(df['mid_smooth_12_deriv1'], bins=bins_deriv, labels=labels) # # # Colonnes de prix futur à analyser # futur_cols = ['futur_percent_1h', 'futur_percent_2h', 'futur_percent_3h', 'futur_percent_4h', 'futur_percent_5h'] # # # Calcul des moyennes et des effectifs # grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"])[futur_cols].agg(['mean', 'count']) # # pd.set_option('display.width', 200) # largeur max affichage # pd.set_option('display.max_columns', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', 300) # largeur max affichage # nettoyage # series = df[f"{indic_2}"].dropna() # unique_vals = df[f"{indic_2}"].nunique() # print(unique_vals) # print(df[f"{indic_2}"]) n = len(self.labels) df[f"{indic_1}_bin"], bins_1h = pd.qcut(df[f"{indic_1}"], q=n, labels=self.labels, retbins=True, duplicates='drop') df[f"{indic_2}_bin"], bins_1d = pd.qcut(df[f"{indic_2}"], q=n, labels=self.labels, retbins=True, duplicates='drop') # Affichage formaté pour code Python print(f"Bornes des quantiles pour {indic_1} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]") print(f"Bornes des quantiles pour {indic_2} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]") # Agrégation grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[futur_cols].agg(['mean', 'count']) # Affichage with pd.option_context('display.max_rows', None, 'display.max_columns', None): print(grouped.round(4)) # Ajout des probabilités de hausse for col in futur_cols: df[f"{col}_is_up"] = df[col] > 0 # Calcul de la proba de hausse proba_up = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[f"{col}_is_up"].mean().unstack() print(f"\nProbabilité de hausse pour {col} (en %):") with pd.option_context('display.max_rows', None, 'display.max_columns', None): print((proba_up * 100).round(1)) # Affichage formaté des valeurs comme tableau Python with pd.option_context('display.max_rows', None, 'display.max_columns', None): df_formatted = (proba_up * 100).round(1) print("data = {") for index, row in df_formatted.iterrows(): row_values = ", ".join([f"{val:.1f}" for val in row]) print(f"'{index}': [{row_values}], ") print("}") def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # (dataframe['mid_smooth_12_deriv1'] == 0) # & (dataframe['mid_smooth_12_deriv1'].shift(1) > 0) # ), ['sell', 'exit_long']] = (1, 'sell_sma5_pct_1h') return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 0): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() last_candle_previous_1h = dataframe.iloc[-13].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair pct_first = 0 total_counts = sum( pair_data['count_of_buys'] for pair_data in self.pairs.values() if not pair in ('BTC/USDT', 'BTC/USDC')) if self.pairs[pair]['first_buy']: pct_first = self.getPctFirstBuy(pair, last_candle) pct = 0.012 if count_of_buys == 1: pct_max = current_profit else: if self.pairs[trade.pair]['last_buy']: pct_max = self.getPctLastBuy(pair, last_candle) else: pct_max = - pct if pair in ('BTC/USDT', 'BTC/USDC') or count_of_buys <= 2: lim = - pct - (count_of_buys * 0.001) # lim = self.getLimitBuy(pair, last_candle, pct) # lim = - (0.012 * (1 + round(count_of_buys / 5)) + 0.001 * (count_of_buys - 1)) # lim = - (0.012 + 0.001 * (count_of_buys - 1) + (0.002 * count_of_buys if count_of_buys > 10 else 0.001 * count_of_buys if count_of_buys > 5 else 0)) else: pct = 0.05 lim = - pct - (count_of_buys * 0.0025) # lim = self.getLimitBuy(pair, last_candle, pct) if (len(dataframe) < 1): print("skip dataframe") return None if not self.should_enter_trade(pair, last_candle, current_time): print("enter trade not allowed") return None condition = (last_candle['sma5_deriv1'] > 0) or ((last_candle['min_rsi_12'] < 20) and (last_candle['hapercent'] > 0)) # and \ if condition and (pct_max < lim): try: if self.pairs[pair]['has_gain']: self.pairs[pair]['force_sell'] = True last_lost = self.getLastLost(last_candle, pair) max_amount = self.config.get('stake_amount') * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / self.mise_factor_buy.value) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys]) if self.wallets.get_available_stake_amount() > stake_amount: trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(current_profit * trade.stake_amount, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'last_trade', 'last_palier_index', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # print(df_filtered) return stake_amount return None except Exception as exception: print(exception) return None last_lost = self.getLastLost(last_candle, pair) if (hours > 6 and last_candle['mid_smooth_24_deriv1'] > 0.1): try: stake_amount = self.pairs[pair]['first_amount'] / 4 if self.wallets.get_available_stake_amount() > stake_amount: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=str(round(pct_max, 4)), profit=round(current_profit * trade.stake_amount, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None except Exception as exception: # print(exception) return None return None def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def getPct60D(self, pair, last_candle): return round((last_candle['max60'] - last_candle['min60']) / last_candle['max60'], 4) def getPctClose60D(self, pair, last_candle): if last_candle['close'] > last_candle['max12']: return 1 if last_candle['close'] < last_candle['min12']: return 0 return round( (last_candle['close'] - last_candle['min12']) / (last_candle['max12'] - last_candle['min12']), 4) def getLimitBuy(self, pair, last_candle, first_pct): count_of_buys = self.pairs[pair]['count_of_buys'] pct60 = self.getPct60D(pair, last_candle) # exemple 0.3 pour 30% if (pct60 < 0.05): lim = - first_pct - (count_of_buys * 0.001 * 0.05 / 0.05) else: # 0.1 # 0.4 lim = - first_pct - (count_of_buys * 0.001 * pct60 / 0.05) return lim def getProbaHausseSma5d(self, last_candle): value_1 = self.getValuesFromTable(self.sma5_deriv1, last_candle['sma5_deriv1']) value_2 = self.getValuesFromTable(self.sma5_deriv2, last_candle['sma5_deriv2']) val = self.approx_val_from_bins( matrice=self.sma5_derive1_2_matrice_df, numeric_matrice=self.sma5_derive1_2_numeric_matrice, row_label=value_2, col_label=value_1 ) return val def adjust_stake_amount(self, pair: str, last_candle: DataFrame): base_stake_amount = self.config.get('stake_amount') # Montant de base configuré # pct60 = round(100 * self.getPctClose60D(pair, last_candle), 2) if True: # not pair in ('BTC/USDT', 'BTC/USDC'): # factors = [1, 1.2, 1.3, 1.4] if self.pairs[pair]['count_of_buys'] == 0: pctClose60 = self.getPctClose60D(pair, last_candle) # dist_max = self.getDistMax(last_candle, pair) factor = self.multi_step_interpolate(pctClose60, self.thresholds, self.factors) adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor) else: adjusted_stake_amount = self.pairs[pair]['first_amount'] else: first_price = self.pairs[pair]['first_buy'] if (first_price == 0): first_price = last_candle['close'] last_max = last_candle['max12'] pct = 5 if last_max > 0: pct = 100 * (last_max - first_price) / last_max factor = self.multi_step_interpolate(pct, self.thresholds, self.factors) adjusted_stake_amount = base_stake_amount * factor # max(base_stake_amount, min(100, base_stake_amount * percent_4)) # pct = 100 * abs(self.getPctFirstBuy(pair, last_candle)) # # factor = self.multi_step_interpolate(pct, self.thresholds, self.factors) if self.pairs[pair]['count_of_buys'] == 0: self.pairs[pair]['first_amount'] = adjusted_stake_amount return adjusted_stake_amount def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if pair == "BTC/USDT" or pair == "BTC/USDC": lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] # if self.pairs[pair]['count_of_buys'] > 6: # pct_to_max = 0.006 * self.pairs[pair]['count_of_buys'] # pctClose60 = self.getPctClose60D(pair, last_candle) # max_60 = last_candle['max60'] # if last_candle['close'] < max_60: # pct_to_max = 0.25 * (max_60 - last_candle['close']) / max_60 # pct_to_max = pct_to_max * (2 - pctClose60) expected_profit = lim #* self.pairs[pair]['total_amount'] #min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) # print( # f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}") return expected_profit def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values # ✅ Première dérivée(variation ou pente) # Positive: la courbe est croissante → tendance haussière. # Négative: la courbe est décroissante → tendance baissière. # Proche de 0: la courbe est plate → marché stable ou en transition. # # Applications: # Détecter les points d’inflexion(changement de tendance) quand elle s’annule.\ # Analyser la vitesse d’un mouvement(plus elle est forte, plus le mouvement est impulsif). # # ✅ Seconde dérivée(accélération ou concavité) # Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse. # Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse. # Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements. # # Exemples: # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # # Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0. # Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe. def calculateRegression(self, dataframe: DataFrame, column='close', window=50, degree=3, future_offset: int = 10 # projection à n bougies après ) -> DataFrame: df = dataframe.copy() regression_fit = [] regression_future_fit = [] regression_fit = [] regression_future_fit = [] for i in range(len(df)): if i < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # Fin de la fenêtre d’apprentissage end_index = i start_index = i - window y = df[column].iloc[start_index:end_index].values # Si les données sont insuffisantes (juste par précaution) if len(y) < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # x centré pour meilleure stabilité numérique x = np.linspace(-1, 1, window) coeffs = np.polyfit(x, y, degree) poly = np.poly1d(coeffs) # Calcul point présent (dernier de la fenêtre) x_now = x[-1] regression_fit.append(poly(x_now)) # Calcul point futur, en ajustant si on dépasse la fin remaining = len(df) - i - 1 effective_offset = min(future_offset, remaining) x_future = x_now + (effective_offset / window) * 2 # respect du même pas regression_future_fit.append(poly(x_future)) df[f"{column}_regression"] = regression_fit # 2. Dérivée première = différence entre deux bougies successives df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"], 4) # 3. Dérivée seconde = différence de la dérivée première df[f"{column}_regression_deriv2"] = round( 10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4) df[f"{column}_future_{future_offset}"] = regression_future_fit # # 2. Dérivée première = différence entre deux bougies successives # df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4) # # # 3. Dérivée seconde = différence de la dérivée première # df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4) return df def getValuesFromTable(self, values, value): for i in range(len(values) - 1): if values[i] <= value < values[i + 1]: return self.labels[i] return self.labels[-1] # cas limite pour la borne max def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label): """ Renvoie une approximation de la valeur à partir des labels binaires (e.g. B5, H1) en utilisant une interpolation simple basée sur les indices. Parameters: matrix_df (pd.DataFrame): Matrice avec les labels binaires en index et colonnes. row_label (str): Label de la ligne (ex: 'B3'). col_label (str): Label de la colonne (ex: 'H2'). Returns: float: Valeur approchée si possible, sinon NaN. """ # Vérification des labels if row_label not in matrice.index or col_label not in matrice.columns: return np.nan # Index correspondant row_idx = self.label_to_index.get(row_label) col_idx = self.label_to_index.get(col_label) # Approximation directe (aucune interpolation complexe ici, juste une lecture) return numeric_matrice[row_idx, col_idx] @property def protections(self): return [ # { # "method": "CooldownPeriod", # "stop_duration_candles": 12 # } # { # "method": "MaxDrawdown", # "lookback_period_candles": self.lookback.value, # "trade_limit": self.trade_limit.value, # "stop_duration_candles": self.protection_stop.value, # "max_allowed_drawdown": self.protection_max_allowed_dd.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": self.protection_stoploss_stop.value, # "only_per_pair": False # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] def conditional_smoothing(self, series, threshold=0.002): smoothed = [series.iloc[0]] for val in series.iloc[1:]: last = smoothed[-1] if abs(val - last) / last >= threshold: smoothed.append(val) else: smoothed.append(last) return pd.Series(smoothed, index=series.index) def smooth_and_derivatives(self, series, window=25, polyorder=3): series = series.copy() if series.isna().sum() > 0: series = series.ffill().bfill() # Si tu veux éviter toute NaN smooth = self.causal_savgol(series, window=window, polyorder=polyorder) deriv1 = np.diff(smooth, prepend=smooth[0]) deriv2 = np.diff(deriv1, prepend=deriv1[0]) return pd.Series(smooth, index=series.index), pd.Series(deriv1, index=series.index), pd.Series(deriv2, index=series.index) def causal_savgol(self, series, window=25, polyorder=3): result = [] half_window = window # Fenêtre complète dans le passé for i in range(len(series)): if i < half_window: result.append(np.nan) continue window_series = series[i - half_window:i] if window_series.isna().any(): result.append(np.nan) continue coeffs = np.polyfit(range(window), window_series, polyorder) poly = np.poly1d(coeffs) result.append(poly(window - 1)) return pd.Series(result, index=series.index) def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]): """ Calcule une régression polynomiale sur les `window` dernières valeurs de la série, puis prédit les `n_future` prochaines valeurs. :param series: Série pandas (ex: dataframe['close']) :param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme :param degree: Degré du polynôme (ex: 2 pour quadratique) :param n_future: Nombre de valeurs futures à prédire :return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures """ if len(series) < window: raise ValueError("La série est trop courte pour la fenêtre spécifiée.") recent_y = series.iloc[-window:].values x = np.arange(window) coeffs = np.polyfit(x, recent_y, degree) poly = np.poly1d(coeffs) x_future = np.arange(window, window + len(steps)) y_future = poly(x_future) # Affichage de la fonction # print("Fonction polynomiale trouvée :") # print(poly) current = series.iloc[-1] count = 0 for future_step in steps: # range(1, n_future + 1) future_x = window - 1 + future_step prediction = poly(future_x) # series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction # ➕ Afficher les prédictions # print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}") if prediction > 0: # current: count += 1 return poly, x_future, y_future, count def calculateStats(self, df, index, target): # Nombre de tranches (modifiable) n_bins_indice = 11 n_bins_valeur = 11 # Créer les tranches dynamiques df['indice_tranche'] = pd.qcut(df[index], q=n_bins_indice, duplicates='drop') df['valeur_tranche'] = pd.qcut(df[target], q=n_bins_valeur, duplicates='drop') # Créer un tableau croisé avec la moyenne des valeurs pivot_mean = df.pivot_table( index='indice_tranche', columns='valeur_tranche', values=target, # <-- c'est la colonne qu'on agrège aggfunc='mean' # <-- on calcule la moyenne ) # Résultat # print("Moyenne des valeurs par double-tranche :") # print(pivot_mean.round(2)) def should_enter_trade(self, pair: str, last_candle, current_time) -> bool: return True limit = 3 # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # if not pair.startswith('BTC'): dispo = round(self.wallets.get_available_stake_amount()) if self.pairs[pair]['stop'] and last_candle['mid_smooth_5_deriv1'] > -0.9 and last_candle['sma5_deriv1'] > 0 and last_candle['sma5_deriv2'] > 0: self.pairs[pair]['stop'] = False self.log_trade( last_candle=last_candle, date=current_time, action="🟢RESTART", dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=0, buys=self.pairs[pair]['count_of_buys'], stake=0 ) else: if self.pairs[pair]['stop'] == False and (last_candle['sma5_deriv1'] < -0.2 or last_candle['sma5_deriv2'] < -3): self.pairs[pair]['stop'] = True # if self.pairs[pair]['current_profit'] > 0: # self.pairs[pair]['force_sell'] = True self.log_trade( last_candle=last_candle, date=current_time, action="🔴STOP", dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=self.pairs[pair]['current_profit'], buys=self.pairs[pair]['count_of_buys'], stake=0 ) return False if self.pairs[pair]['stop']: return False # if pair.startswith('BTC'): # return True # BTC toujours autorisé #return True # Filtrer les paires non-BTC non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')] # Compter les positions actives sur les paires non-BTC max_nb_trades = 0 total_non_btc = 0 max_pair = '' limit_amount = 250 max_amount = 0 for p in non_btc_pairs: max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys']) max_amount = max(max_amount, self.pairs[p]['total_amount']) for p in non_btc_pairs: if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit): # if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount): max_pair = p total_non_btc += self.pairs[p]['count_of_buys'] pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle) val = self.getProbaHausseSma5d(last_candle) if val < 15: return False return True self.should_enter_trade_count = 0 # if max_pair != pair and self.pairs[pair]['total_amount'] > 300: # return False if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit): # trade = self.pairs[max_pair]['current_trade'] current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) current_time_utc = current_time.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pct_max_max = self.getPctFirstBuy(max_pair, last_candle) # print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}") return max_pair == pair or pct_max < - 0.25 or ( pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30) else: return True def calculePlateaux(self, informative: pd.DataFrame, plateau_duration, plateau_tolerance) -> pd.DataFrame: # 1. Détection plateau informative['rolling_min'] = informative['close'].rolling(plateau_duration).min() informative['rolling_max'] = informative['close'].rolling(plateau_duration).max() informative['plateau_amplitude'] = (informative['rolling_max'] - informative['rolling_min']) / informative[ 'rolling_min'] informative['plateau'] = informative['plateau_amplitude'] < plateau_tolerance # 2. Détection "fin de plateau" # informative['plateau_end'] = (informative['plateau'] & ~informative['plateau'].shift(-1).fillna(False).astype(bool)) next_plateau = informative['plateau'].shift(-1) next_plateau = next_plateau.fillna(False).astype(bool) informative['plateau_end'] = informative['plateau'] & ~next_plateau # 3. Enregistrer dernier plateau (min/max) last_min = None last_max = None last_status = [] for i, row in informative.iterrows(): if row['plateau_end']: last_min = row['rolling_min'] last_max = row['rolling_max'] if last_min is not None and last_max is not None: if row['close'] > last_max: breakout = "up" distance = (row['close'] - last_max) / last_max elif row['close'] < last_min: breakout = "down" distance = (last_min - row['close']) / last_min else: breakout = "inside" distance = 0 else: breakout = None distance = None last_status.append((breakout, distance)) informative['breakout_status'] = [s[0] for s in last_status] informative['breakout_distance'] = [s[1] for s in last_status] return informative