# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np from pandas import DataFrame from typing import Optional, Union, Tuple import logging import configparser from technical import pivots_points # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib import requests from datetime import timezone, timedelta logger = logging.getLogger(__name__) from tabulate import tabulate def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df class Zeus_8_3_2_B_4_2(IStrategy): levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] startup_candle_count = 24 # ROI table: minimal_roi = { "0": 0.564, "567": 0.273, "2814": 0.12, "7675": 0 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = True # Buy hypers timeframe = '5m' max_open_trades = 5 max_amount = 40 # DCA config position_adjustment_enable = True plot_config = { "main_plot": { "sma5_1h": { "color": "white" }, "sma5_1d": { "color": "blue" }, "sma20": { "color": "yellow" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", }, "sma10": { "color": "blue" }, "min12_1d": { "color": "red" }, "max12_1d": { "color": 'red' }, "min50": { "color": 'green' }, "max50": { "color": 'green' } }, "subplots": { "Pct": { "sma20_pct": { 'color': "green" }, "down_pct": { "color": "blue" }, "down_pct_1h": { "color": "red" }, "down_pct_1d": { "color": "red" } }, "Rsi": { "rsi": { "color": "pink" }, "rsi_1h": { "color": "red" }, "rsi_1d": { "color": "blue" } }, "Rsi_diff": { "rsi_diff_1h": { "color": "red" }, "rsi_diff_1d": { "color": "blue" }, }, "Down": { "down_count_1h": { "color": "green" }, "up_count_1h": { "color": "blue" } }, # "Diff": { # "sma10_diff": { # "color": "#74effc" # } # }, "smooth": { 'sma5_diff_sum_1h': { "color": "green" }, 'sma5_diff2_sum_1h': { "color": "blue" }, 'mid_smooth_deriv1_1d': { "color": "blue" }, 'mid_smooth_deriv1_1h': { "color": "red" }, 'mid_smooth_deriv2_1d': { "color": "pink" }, 'mid_smooth_deriv2_1h': { "color": "#da59a6" } } } } columns_logged = False pairs = { pair: { "first_buy": 0, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, "last_buy": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, "last_candle": {}, "last_trade": None, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0 } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 trades = list() max_profit_pairs = {} protection_percent_buy_lost = IntParameter(1, 10, default=5, space='protection') protection_fibo = IntParameter(1, 10, default=2, space='protection') sell_allow_decrease = DecimalParameter(0.005, 0.02, default=0.2, decimals=2, space='sell', optimize=True, load=True) # Probabilité de hausse pour futur_percent_3h (en %): # mid_smooth_deriv1_1h_bin B5 B4 B3 B2 B1 N0 H1 H2 H3 H4 H5 # sma24_diff_1h_bin # B5 41.0 47.2 48.1 45.6 74.0 65.9 66.5 83.8 77.8 72.1 81.0 # B4 41.2 35.8 48.4 46.5 59.9 60.2 75.8 79.4 84.6 83.0 78.5 # B3 34.1 39.7 42.8 47.0 63.3 64.5 71.5 80.4 82.0 86.6 76.6 # B2 27.5 27.9 32.3 33.2 61.9 67.1 70.8 79.5 81.3 73.6 81.9 # B1 35.0 26.5 24.4 34.9 50.0 59.2 69.4 72.8 79.8 77.4 69.5 # N0 30.6 19.9 23.6 30.8 41.9 59.2 67.5 70.6 74.0 63.0 75.0 # H1 25.2 28.7 28.6 25.8 35.9 44.2 60.1 68.8 67.7 69.6 80.9 # H2 29.8 20.8 23.9 30.4 34.4 37.5 52.7 66.1 69.8 67.5 62.9 # H3 25.7 29.4 22.7 29.8 37.7 47.1 59.9 68.5 66.5 68.6 66.4 # H4 30.6 27.5 25.1 22.6 30.8 34.1 50.9 59.8 57.0 68.6 63.7 # H5 14.8 21.6 22.2 35.3 19.3 31.6 38.3 59.6 65.2 56.8 59.6 # Données sous forme de dictionnaire smooth_smadiff_matrice = { "B5": [41.0, 41.2, 34.1, 27.5, 35.0, 30.6, 25.2, 29.8, 25.7, 30.6, 14.8], "B4": [47.2, 35.8, 39.7, 27.9, 26.5, 19.9, 28.7, 20.8, 29.4, 27.5, 21.6], "B3": [48.1, 48.4, 42.8, 32.3, 24.4, 23.6, 28.6, 23.9, 22.7, 25.1, 22.2], "B2": [45.6, 46.5, 47.0, 33.2, 34.9, 30.8, 25.8, 30.4, 29.8, 22.6, 35.3], "B1": [74.0, 59.9, 63.3, 61.9, 50.0, 41.9, 35.9, 34.4, 37.7, 30.8, 19.3], "N0": [65.9, 60.2, 64.5, 67.1, 59.2, 59.2, 44.2, 37.5, 47.1, 34.1, 31.6], "H1": [66.5, 75.8, 71.5, 70.8, 69.4, 67.5, 60.1, 52.7, 59.9, 50.9, 38.3], "H2": [83.8, 79.4, 80.4, 79.5, 72.8, 70.6, 68.8, 66.1, 68.5, 59.8, 59.6], "H3": [77.8, 84.6, 82.0, 81.3, 79.8, 74.0, 67.7, 69.8, 66.5, 57.0, 65.2], "H4": [72.1, 83.0, 86.6, 73.6, 77.4, 63.0, 69.6, 67.5, 68.6, 68.6, 56.8], "H5": [81.0, 78.5, 76.6, 81.9, 69.5, 75.0, 80.9, 62.9, 66.4, 63.7, 59.6] } index_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] smooth_smadiff_matrice_df = pd.DataFrame(smooth_smadiff_matrice, index=index_labels) # Récupération des labels ordonnés ordered_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] label_to_index = {label: i for i, label in enumerate(ordered_labels)} # Extraction de la matrice numérique smooth_smadiff_numeric_matrice = smooth_smadiff_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values smooth_pct_max_hour_matrice = { 'B5': [43.5, 52.7, 62.3, 65.5, 86.9, 63.1, 81.5, 86.7, 90.2, 90.1, 93.0], 'B4': [34.9, 46.3, 53.6, 60.4, 75.8, 83.3, 81.5, 83.0, 86.4, 86.9, 91.1], 'B3': [20.5, 35.4, 43.7, 54.5, 69.7, 71.6, 80.4, 84.7, 86.7, 84.9, 85.9], 'B2': [11.5, 25.4, 36.4, 47.9, 62.3, 65.7, 76.5, 82.0, 81.8, 82.8, 77.7], 'B1': [3.6, 14.9, 26.8, 41.1, 55.6, 71.4, 74.3, 79.8, 80.8, 82.3, 75.1], 'N0': [0.0, 6.9, 18.3, 32.0, 47.2, 62.1, 69.1, 74.8, 78.3, 76.6, 71.6], 'H1': [0.7, 3.8, 9.4, 24.2, 40.6, 59.7, 67.8, 70.9, 73.4, 72.1, 70.0], 'H2': [0.0, 0.6, 6.5, 13.6, 33.6, 51.7, 64.9, 70.2, 68.4, 67.8, 65.8], 'H3': [1.4, 0.6, 2.6, 6.6, 23.3, 50.2, 56.2, 63.6, 65.7, 64.5, 64.7], 'H4': [1.6, 0.3, 3.0, 3.2, 11.4, 32.7, 44.0, 54.9, 61.7, 60.6, 63.6], 'H5': [1.8, 2.6, 0.6, 1.1, 9.7, 12.9, 26.2, 44.5, 52.6, 54.5, 56.2], } smooth_pct_max_hour_matrice_df = pd.DataFrame(smooth_pct_max_hour_matrice, index=index_labels) # Récupération des labels ordonnés # ordered_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] # label_to_index = {label: i for i, label in enumerate(ordered_labels)} # Extraction de la matrice numérique smooth_pct_max_hour_numeric_matrice = smooth_pct_max_hour_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values # Données sous forme de dictionnaire smooth_sma_144_diff_matrice = { "B5":[40.3, 52.1, 60.2, 68.6, 86.3, 76.5, 75.1, 83.5, 88.7, 96.3, 91.6], "B4":[26.6, 39.4, 48.1, 57.0, 76.7, 82.4, 79.6, 82.4, 91.8, 86.6, 87.8], "B3":[21.5, 27.7, 42.7, 53.2, 70.9, 76.6, 80.8, 79.4, 88.3, 88.0, 87.8], "B2":[15.1, 20.8, 32.9, 46.9, 59.1, 79.6, 82.5, 79.6, 80.8, 87.0, 85.5], "B1":[15.7, 15.4, 21.9, 29.4, 48.3, 66.6, 76.4, 77.8, 80.8, 83.5, 81.4], "N0":[15.0, 10.5, 20.1, 24.5, 36.9, 59.9, 68.8, 74.1, 77.7, 83.0, 75.7], "H1":[14.8, 10.7, 15.1, 21.0, 30.1, 47.3, 59.2, 70.4, 76.1, 82.7, 82.6], "H2":[7.9, 8.6, 13.6, 20.6, 27.0, 39.5, 55.2, 68.9, 69.0, 78.4, 83.4], "H3":[9.2, 6.2, 12.6, 21.7, 23.6, 33.1, 42.3, 57.8, 66.0, 71.9, 81.9], "H4":[4.8, 13.1, 16.3, 14.5, 19.5, 26.4, 35.6, 49.2, 63.2, 68.2, 71.6], "H5":[17.9, 25.7, 20.8, 17.8, 8.7, 18.5, 32.3, 37.7, 49.3, 59.8, 61.7] } smooth_sma_144_diff_matrice_df = pd.DataFrame(smooth_smadiff_matrice, index=index_labels) # Extraction de la matrice numérique smooth_sma_144_diff_numeric_matrice = smooth_sma_144_diff_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() # last_candle_12 = dataframe.iloc[-13].squeeze() # if (last_candle['close'] < self.pairs[pair]['last_sell'] * 0.99 or minutes > 60 * 5) & (self.pairs[pair]['stop']): # print(f"restart {pair} last_sell={self.pairs[pair]['last_sell'] * 0.99} minutes={minutes}") # self.pairs[pair]['stop'] = False val = self.getProbaHausse144(last_candle) # allow_to_buy = True #(not self.stop_all) #& (not self.all_down) allow_to_buy = not self.pairs[pair]['stop'] and val > 50 #not last_candle['tendency'] in ('B-', 'B--') # (rate <= float(limit)) | (entry_tag == 'force_entry') if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.log_trade( last_candle=last_candle, date=current_time, action=("Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() allow_to_sell = (last_candle['percent'] < 0) minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries #self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_trade'] = trade self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 self.trades = list() dispo= round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(trade.calc_profit(rate, amount), 2) ) self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time return (allow_to_sell) | (exit_reason == 'force_exit') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['haclose'], self.pairs[pair]['last_max']) count_of_buys = trade.nr_of_successful_entries self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = current_profit self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], current_profit) if (last_candle['mid_smooth_deriv1'] >= 0): return None if (last_candle['tendency'] in ('H++', 'H+')) and (last_candle['rsi'] < 80): return None # val = self.getProbaHausse144(last_candle) # if val > 50: # return None baisse = self.pairs[pair]['max_profit'] - current_profit mx = self.pairs[pair]['max_profit'] / 5 if (baisse > mx) & (current_profit > expected_profit): self.trades = list() return 'mx_' + str(count_of_buys) if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit): self.trades = list() return 'pft_' + str(count_of_buys) self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch']) def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs from typing import List def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float: if pct <= thresholds[0]: return factors[0] if pct >= thresholds[-1]: return factors[-1] for i in range(1, len(thresholds)): if pct <= thresholds[i]: # interpolation linéaire entre thresholds[i-1] et thresholds[i] return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / ( thresholds[i] - thresholds[i - 1]) # Juste au cas où (devrait jamais arriver) return factors[-1] def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30, start_factor: float = 1.0, end_factor: float = 2.0) -> float: if pct <= start_pct: return start_factor if pct >= end_pct: return end_factor # interpolation linéaire return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct) def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt': return if self.columns_logged % 30 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}|{'Buys':>4}| {'Stake':>5} |" f"sum_1h|sum_1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|" ) self.printLineLog() self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( # int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = round((last_candle['haclose'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) max_touch = '' #round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1) if trade_type is not None: if np.isnan(last_candle['rsi_1d']): string = ' ' else: string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_diff_1d'])) trade_type = trade_type \ + " " + string \ + " " + str(int(last_candle['rsi_1h'])) \ + " " + str(int(last_candle['rsi_diff_1h'])) self.printLog( f"| {date:<16} | {action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"| {profit or '-':>8} | {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {round(self.pairs[pair]['last_max'], 0) or '-':>7} |{buys or '-':>4}|{stake or '-':>7}" f"|{round(last_candle['sma5_diff_sum_1h'], 2) or '-':>6}|{round(last_candle['sma5_diff_sum_1d'], 2) or '-':>6}" f"|{last_candle['tendency'] or '-':>3}|{last_candle['tendency_1h'] or '-':>3}|{last_candle['tendency_1d'] or '-':>3}" f"|{round(last_candle['mid_smooth_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1h'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|" # f"|{round(last_candle['mid_smooth_deriv2']) or '-' :>3 }|{round(last_candle['mid_smooth_deriv2_1h']) or '-':>5}|{round(last_candle['mid_smooth_deriv2_1d']) or '-':>5}" ) def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 10}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}+{'-' * 4}+{'-' * 7}+" f"{'-' * 6}+{'-' * 6}+{'-' * 3}+{'-' * 3}+{'-' * 3}+{'-' * 6}+{'-' * 6}+{'-' * 6}+" ) def printLog(self, str): if not self.dp.runmode.value in ('backtest', 'hyperopt'): logger.info(str) else: print(str) def add_tendency_column(self, dataframe: pd.DataFrame, suffixe='') -> pd.DataFrame: def tag_by_derivatives(row): d1 = row[f"mid_smooth_deriv1{suffixe}"] d2 = row[f"mid_smooth_deriv2{suffixe}"] d1_lim_inf = -0.01 d1_lim_sup = 0.01 if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup: return 'P' # Palier if d1 == 0.0: return 'DH' if d2 > 0 else 'DB' #Depart Hausse / Départ Baisse if d1 > d1_lim_sup: return 'H++' if d2 > 0 else 'H+' #Acceleration Hausse / Ralentissement Hausse if d1 < d1_lim_inf: return 'B--' if d2 < 0 else 'B-' # Accéleration Baisse / Ralentissement Baisse return 'Mid' dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1) return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=200) dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12) dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50) dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200) dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200) dataframe['max50'] = talib.MAX(dataframe['close'], timeperiod=50) dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close'] dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5) dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10) dataframe['sma10_diff'] = 100 * dataframe['sma10'].diff() / dataframe['sma10'] dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20) dataframe['sma20_pct'] = 100 * dataframe['sma20'].diff() / dataframe['sma20'] dataframe['sma144'] = talib.SMA(dataframe, timeperiod=144) dataframe['sma144_diff'] = 100 * dataframe['sma144'].diff() / dataframe['sma144'] dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"] dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3) dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5) dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12) dataframe = self.calculateTendency(dataframe, window=12) dataframe = self.calculateTendency(dataframe, window=48, suffixe="_144", factor_1=1000, factor_2=10) # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14) dataframe['rsi_diff'] = dataframe['rsi'].diff() dataframe['rsi_diff_2'] = dataframe['rsi_diff'].diff() # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = ( (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_upperband"] ) # Normalization dataframe['average_line'] = dataframe['close'].mean() dataframe['average_line_50'] = talib.MIDPOINT(dataframe['close'], timeperiod=50) dataframe['average_line_288'] = talib.MIDPOINT(dataframe['close'], timeperiod=288) dataframe['average_line_288_098'] = dataframe['average_line_288'] * 0.98 dataframe['average_line_288_099'] = dataframe['average_line_288'] * 0.99 # Compter les baisses consécutives self.calculateDownAndUp(dataframe, limit=0.0001) dataframe = self.apply_regression_derivatives(dataframe, column='mid_smooth_144', window=144, degree=3, future_offset=12) ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") heikinashi = qtpylib.heikinashi(informative) informative['haopen'] = heikinashi['open'] informative['haclose'] = heikinashi['close'] informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose'] informative = self.calculateTendency(informative, 12) # informative = self.apply_regression_derivatives(informative, column='mid', window=5, degree=3) # informative['volatility'] = talib.STDDEV(informative['close'], timeperiod=14) / informative['close'] # informative['atr'] = (talib.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / informative['close'] informative['rsi'] = talib.RSI(informative['close']) #, timeperiod=7) informative['rsi_diff'] = informative['rsi'].diff() informative['rsi_sum'] = (informative['rsi'].rolling(7).sum() - 350) / 7 informative['rsi_sum_diff'] = informative['rsi_sum'].diff() informative['rsi_diff_2'] = informative['rsi_diff'].diff() informative['max12'] = talib.MAX(informative['close'], timeperiod=12) informative['min12'] = talib.MIN(informative['close'], timeperiod=12) informative['sma5'] = talib.SMA(informative, timeperiod=5) informative['sma5_diff'] = 100 * informative['sma5'].diff() / informative['sma5'] informative['sma24'] = talib.SMA(informative, timeperiod=24) informative['sma24_diff'] = 100 * informative['sma24'].diff() / informative['sma24'] informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5'] informative['sma5_diff_sum'] = (informative['sma5_pct'].rolling(5).sum()) / 5 informative['sma5_diff2_sum'] = informative['sma5_diff_sum'].diff() self.calculateDownAndUp(informative, limit=0.0012) dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative = self.calculateTendency(informative, 7) # informative['volatility'] = talib.STDDEV(informative['close'], timeperiod=14) / informative['close'] # informative['atr'] = (talib.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / informative['close'] # informative = self.apply_regression_derivatives(informative, column='mid', window=5, degree=3) informative['max12'] = talib.MAX(informative['close'], timeperiod=12) informative['min12'] = talib.MIN(informative['close'], timeperiod=12) informative['max3'] = talib.MAX(informative['close'], timeperiod=3) informative['min3'] = talib.MIN(informative['close'], timeperiod=3) informative['rsi'] = talib.RSI(informative['close']) #, timeperiod=7) informative['rsi_diff'] = informative['rsi'].diff() informative['rsi_sum'] = (informative['rsi'].rolling(7).sum() - 350) / 7 informative['rsi_diff_2'] = informative['rsi_diff'].diff() informative['sma5'] = talib.SMA(informative, timeperiod=5) informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5'] informative['sma5_diff_sum'] = (informative['sma5_pct'].rolling(5).sum()) / 5 informative['sma5_diff2_sum'] = informative['sma5_diff_sum'].diff() dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 # dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01 # dataframe['limit'] = dataframe['close'] count_buys = 0 if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue print(trade) filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price print(buy) count = count + 1 amount += buy.price * buy.filled # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count # dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) # dataframe['amount'] = amount print(f"amount= {amount}") # dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_deriv1_144'], dataframe['mid_smooth_deriv2_144']) dataframe['mid_smooth_1h'] = dataframe['mid_smooth_1h'].rolling(window=12, center=True).mean() dataframe["mid_smooth_deriv1_1h"] = dataframe["mid_smooth_1h"].rolling(12).mean().diff() / 12 dataframe["mid_smooth_deriv2_1h"] = 12 * dataframe["mid_smooth_deriv1_1h"].rolling(12).mean().diff() dataframe['percent_with_previous_day'] = 100 * (dataframe['close'] - dataframe['close_1d']) / dataframe['close'] dataframe['percent_with_max_hour'] = 100 * (dataframe['close'] - dataframe['max12_1h']) / dataframe['close'] dataframe['futur_percent_1h'] = 100 * (dataframe['close'].shift(-12) - dataframe['close']) / dataframe['close'] dataframe['futur_percent_3h'] = 100 * (dataframe['close'].shift(-36) - dataframe['close']) / dataframe['close'] dataframe['futur_percent_5h'] = 100 * (dataframe['close'].shift(-60) - dataframe['close']) / dataframe['close'] dataframe['futur_percent_12h'] = 100 * (dataframe['close'].shift(-144) - dataframe['close']) / dataframe['close'] return dataframe def calculateDownAndUp(self, dataframe, limit=0.0001): dataframe['down'] = dataframe['hapercent'] <= limit dataframe['up'] = dataframe['hapercent'] >= limit dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') def calculateTendency(self, dataframe, window=12, suffixe='', factor_1=100, factor_2=10): dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2 # 2. Calcul du lissage par moyenne mobile médiane dataframe[f"mid_smooth{suffixe}"] = dataframe['close'].rolling(window=window, center=True, min_periods=1).median().rolling( int(window / 4)).mean() # 2. Dérivée première = différence entre deux bougies successives dataframe[f"mid_smooth_deriv1{suffixe}"] = round(factor_1 * dataframe[f"mid_smooth{suffixe}"].diff() / dataframe[f"mid_smooth{suffixe}"], 4) # 3. Dérivée seconde = différence de la dérivée première dataframe[f"mid_smooth_deriv2{suffixe}"] = round(factor_2 * dataframe[f"mid_smooth_deriv1{suffixe}"].rolling(int(window / 4)).mean().diff(), 4) dataframe = self.add_tendency_column(dataframe, suffixe) return dataframe def getOpenTrades(self): # if len(self.trades) == 0: print('search open trades') self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] # self.getOpenTrades() expected_profit = self.expectedProfit(pair, dataframe.iloc[-1]) # self.getBinanceOrderBook(pair, dataframe) last_candle = dataframe.iloc[-1].squeeze() print("---------------" + pair + "----------------") print('adjust stake amount ' + str(self.adjust_stake_amount(pair, dataframe.iloc[-1]))) # print('adjust exit price ' + str(self.adjust_exit_price(dataframe.iloc[-1]))) print('calcul expected_profit ' + str(expected_profit)) buy_level = dataframe['average_line_50'] #dataframe['buy_level'] # self.get_buy_level(pair, dataframe) dataframe.loc[ ( (dataframe['max200_diff'] >= 0.01) & (dataframe['percent12'] < -0.002) & (dataframe['open'] < dataframe['average_line_288_099']) & (dataframe['open'] < dataframe['average_line_50']) & (dataframe['min12'].shift(2) == dataframe['min12']) & (dataframe['up_count'] > 0) & (dataframe["bb_width"] > 0.01) ), ['enter_long', 'enter_tag']] = (1, 'mx200') dataframe.loc[ ( # (dataframe['down_count'].shift(1) < - 1) # & (dataframe['down_count'] == 0) (dataframe['mid_smooth_deriv1'] > 0) ), ['enter_long', 'enter_tag']] = (1, 'down') dataframe.loc[ ( (dataframe['low'] < dataframe['min200']) & (dataframe['min50'] == dataframe['min50'].shift(3)) & (dataframe['tendency'] != "B-") ), ['enter_long', 'enter_tag']] = (1, 'low') dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) if self.dp.runmode.value in ('backtest'): dataframe.to_feather(f"user_data/data/binance/{metadata['pair'].replace('/', '_')}_df.feather") df = dataframe # # Définition des tranches pour les dérivées # bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf] # labels = ['forte baisse', 'légère baisse', 'neutre', 'légère hausse', 'forte hausse'] # # # Ajout des colonnes bin (catégorisation) # df[f"{indic_1}_bin"] = pd.cut(df['mid_smooth_deriv1_1h'], bins=bins_deriv, labels=labels) # df[f"{indic_2}_bin"] = pd.cut(df['mid_smooth_deriv1_1d'], bins=bins_deriv, labels=labels) # # # Colonnes de prix futur à analyser # futur_cols = ['futur_percent_1h', 'futur_percent_2h', 'futur_percent_3h', 'futur_percent_4h', 'futur_percent_5h'] # # # Calcul des moyennes et des effectifs # grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"])[futur_cols].agg(['mean', 'count']) # # pd.set_option('display.width', 200) # largeur max affichage # pd.set_option('display.max_columns', None) # Colonnes à traiter # futur_cols = ['futur_percent_1h', 'futur_percent_3h', 'futur_percent_5h', 'futur_percent_12h'] futur_cols = ['futur_percent_3h'] # Tranches équitables par quantiles # Exemple pour 10 quantiles labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] #indic_1 = 'mid_smooth_deriv1_1h' #indic_2 = 'sma24_diff_1h' #indic_2 = 'percent_with_max_hour' indic_1 = 'mid_smooth_deriv1_144' indic_2 = 'mid_smooth_deriv2_144' df[f"{indic_1}_bin"], bins_1h = pd.qcut(df[f"{indic_1}"], q=11, labels=labels, retbins=True, duplicates='drop') df[f"{indic_2}_bin"], bins_1d = pd.qcut(df[f"{indic_2}"], q=11, labels=labels, retbins=True, duplicates='drop') pd.set_option('display.max_columns', None) pd.set_option('display.width', 300) # largeur max affichage # Affichage formaté pour code Python print(f"Bornes des quantiles pour {indic_1} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]") print(f"Bornes des quantiles pour {indic_2} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]") # Agrégation grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[futur_cols].agg(['mean', 'count']) # Affichage with pd.option_context('display.max_rows', None, 'display.max_columns', None): print(grouped.round(4)) # Ajout des probabilités de hausse for col in futur_cols: df[f"{col}_is_up"] = df[col] > 0 # Calcul de la proba de hausse proba_up = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[f"{col}_is_up"].mean().unstack() print(f"\nProbabilité de hausse pour {col} (en %):") with pd.option_context('display.max_rows', None, 'display.max_columns', None): print((proba_up * 100).round(1)) # Affichage formaté des valeurs comme tableau Python with pd.option_context('display.max_rows', None, 'display.max_columns', None): df_formatted = (proba_up * 100).round(1) print("data = {") for index, row in df_formatted.iterrows(): row_values = ", ".join([f"{val:.1f}" for val in row]) print(f"'{index}': [{row_values}], ") print("}") return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # (dataframe['mid_smooth_deriv1'] == 0) # & (dataframe['mid_smooth_deriv1'].shift(1) > 0) # ), ['sell', 'exit_long']] = (1, 'sell_sma5_pct_1h') return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 50): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1 = dataframe.iloc[-2].squeeze() last_candle_2 = dataframe.iloc[-3].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() last_candle_previous_1h = dataframe.iloc[-13].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 if (len(dataframe) < 1): print("skip dataframe") return None pair = trade.pair if self.dp.runmode.value in ('dry_run'): if pair not in ('BTC/USDT', 'BTC/USDC', 'XRP/USDT', 'XRP/USDC', 'ETH/USDT', 'ETH/USDC'): # print(f"skip pair {pair}") return None else: if pair not in ('BTC/USDT', 'BTC/USDC'): # print(f"skip pair {pair}") return None count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pct_first = 0 if self.pairs[pair]['first_buy']: pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) pct = 0.012 if count_of_buys == 1: pct_max = current_profit else: if self.pairs[trade.pair]['last_buy']: pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4) else: pct_max = - pct lim = - pct - (count_of_buys * 0.001) # print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_max={pct_max:.3f} lim={lim:.3f} rsi_diff_1f={last_candle['rsi_diff_1h']}") val = self.getProbaHausse144(last_candle) # print(f"Valeur approximée pour B3 / H2 : {val:.2f}") # if (days_since_open > count_of_buys) & (0 < count_of_buys <= max_buys) & (current_rate <= limit) & (last_candle['enter_long'] == 1): limit_buy = 20 if (count_of_buys < limit_buy) \ and (last_candle['enter_long'] == 1) \ and (pct_max < lim and val > 50 and last_candle['mid_smooth_deriv1_1d'] > - 1): try: max_amount = self.config.get('stake_amount', 100) * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / pct) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys]) trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.log_trade( last_candle=last_candle, date=current_time, action="Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount except Exception as exception: print(exception) return None # if (count_of_buys < limit_buy and pct_max > pct and current_profit > 0.004) \ # and (last_candle['rsi_diff_1h'] >= -5) \ # and (last_candle['tendency'] in ('P', 'H++', 'DH', 'H+')) \ # and (last_candle['mid_smooth_deriv1'] > 0.015): # try: # trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' # self.log_trade( # last_candle=last_candle, # date=current_time, # action="Gain +", # dispo=dispo, # pair=trade.pair, # rate=current_rate, # trade_type=trade_type, # profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2), # buys=trade.nr_of_successful_entries + 1, # stake=round(stake_amount, 2) # ) # self.pairs[trade.pair]['last_buy'] = current_rate # self.pairs[trade.pair]['max_touch'] = last_candle['close'] # self.pairs[trade.pair]['last_candle'] = last_candle # return stake_amount # except Exception as exception: # print(exception) # return None return None def getProbaHausse144(self, last_candle, indic_1='mid_smooth_deriv1_144', indic_2='sma144_diff'): value_1 = self.get_mid_smooth_label(last_candle[indic_1]) # ex. 'B2' value_2 = self.get_sma24_diff_label(last_candle[indic_2]) val = self.approx_val_from_bins(matrice=self.smooth_sma_144_diff_matrice_df, numeric_matrice=self.smooth_sma_144_diff_numeric_matrice, row_label=value_2, col_label=value_1) return val def getProbaHausse(self, last_candle, indic_1='mid_smooth_deriv1_1h', indic_2='sma24_diff_1h'): value_1 = self.get_mid_smooth_label(last_candle[indic_1]) # ex. 'B2' value_2 = self.get_sma24_diff_label(last_candle[indic_2]) val = self.approx_val_from_bins(matrice=self.smooth_smadiff_matrice_df, numeric_matrice=self.smooth_smadiff_numeric_matrice, row_label=value_2, col_label=value_1) return val def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # Calculer le minimum des 14 derniers jours base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré # if (self.pairs[pair]['count_of_buys'] == 0): # mid_smooth_label = self.get_mid_smooth_label(last_candle['mid_smooth_deriv1_1h']) # ex. 'B2' # percent_with_max_hour = self.get_sma24_diff_label(last_candle['percent_with_max_hour']) # # val = self.approx_val_from_bins(matrice=self.smooth_pct_max_hour_matrice_df, row_label=percent_with_max_hour, col_label=mid_smooth_label) # # base_stake_amount = base_stake_amount * (1 + val / 500) first_price = self.pairs[pair]['first_buy'] if (first_price == 0): first_price = last_candle['close'] last_max = last_candle['max12_1d'] pct = 5 if last_max > 0: pct = 100 * (last_max - first_price) / last_max thresholds = [2, 5, 10, 20] factors = [1, 1.25, 1.5, 2.0] factor = self.multi_step_interpolate(pct, thresholds, factors) adjusted_stake_amount = base_stake_amount * factor #max(base_stake_amount, min(100, base_stake_amount * percent_4)) return adjusted_stake_amount def expectedProfit(self, pair: str, last_candle: DataFrame): expected_profit = 0.004 #min(0.01, first_max) # print( # f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}") return expected_profit def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values # ✅ Première dérivée(variation ou pente) # Positive: la courbe est croissante → tendance haussière. # Négative: la courbe est décroissante → tendance baissière. # Proche de 0: la courbe est plate → marché stable ou en transition. # # Applications: # Détecter les points d’inflexion(changement de tendance) quand elle s’annule.\ # Analyser la vitesse d’un mouvement(plus elle est forte, plus le mouvement est impulsif). # # ✅ Seconde dérivée(accélération ou concavité) # Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse. # Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse. # Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements. # # Exemples: # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # # Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0. # Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe. def apply_regression_derivatives(self, dataframe: DataFrame, column: str = 'close', window: int = 50, degree: int = 3, future_offset: int = 10 # projection à n bougies après ) -> DataFrame: df = dataframe.copy() regression_fit = [] # deriv1 = [] # deriv2 = [] regression_future_fit = [] # regression_future_deriv1 = [] # regression_future_deriv2 = [] for i in range(len(df)): if i < window or i + future_offset >= len(df): regression_fit.append(np.nan) # deriv1.append(np.nan) # deriv2.append(np.nan) regression_future_fit.append(np.nan) # regression_future_deriv1.append(np.nan) # regression_future_deriv2.append(np.nan) continue y = df[column].iloc[i - window:i].values x = np.arange(window) coeffs = np.polyfit(x, y, degree) poly = np.poly1d(coeffs) x_now = window - 1 x_future = x_now + future_offset regression_fit.append(poly(x_now)) # deriv1.append(np.polyder(poly, 1)(x_now)) # deriv2.append(np.polyder(poly, 2)(x_now)) regression_future_fit.append(poly(x_future)) # regression_future_deriv1.append(np.polyder(poly, 1)(x_future)) # regression_future_deriv2.append(np.polyder(poly, 2)(x_future)) # df['regression_fit'] = regression_fit # df['regression_deriv1'] = deriv1 # df['regression_deriv2'] = deriv2 df['regression_future_fit'] = regression_future_fit # df['regression_future_deriv1'] = regression_future_deriv1 # df['regression_future_deriv2'] = regression_future_deriv2 return df def get_mid_smooth_label(self, value): bins = [-2.0622, -0.1618, -0.0717, -0.0353, -0.0135, 0.0, 0.0085, 0.0276, 0.0521, 0.0923, 0.1742, 2.3286] labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] for i in range(len(bins) - 1): if bins[i] <= value < bins[i + 1]: return labels[i] return labels[-1] # cas limite pour la borne max def get_sma24_diff_label(self, value): bins = [-0.84253877, -0.13177195, -0.07485074, -0.04293497, -0.02033502, -0.00215711, 0.01411933, 0.03308264, 0.05661652, 0.09362708, 0.14898214, 0.50579505] labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] for i in range(len(bins) - 1): if bins[i] <= value < bins[i + 1]: return labels[i] return labels[-1] def interpolated_val_from_bins(self, row_pos, col_pos): """ Renvoie une approximation interpolée (bilinéaire) d'une valeur dans la matrice à partir de positions flottantes dans l'index (ligne) et les colonnes. Parameters: matrix_df (pd.DataFrame): Matrice des probabilités (index/colonnes = labels). row_pos (float): Position réelle de la ligne (0 = B5, 10 = H5). col_pos (float): Position réelle de la colonne (0 = B5, 10 = H5). Returns: float: Valeur interpolée, ou NaN si en dehors des bornes. """ # Labels ordonnés labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5'] n = len(labels) # Vérification des limites if not (0 <= row_pos <= n - 1) or not (0 <= col_pos <= n - 1): return np.nan # Conversion des labels -> matrice matrix = self.smooth_smadiff_matrice_df.reindex(index=labels, columns=labels).values # Coordonnées entières (inférieures) i = int(np.floor(row_pos)) j = int(np.floor(col_pos)) # Coefficients pour interpolation dx = row_pos - i dy = col_pos - j # Précautions sur les bords if i >= n - 1: i = n - 2; dx = 1.0 if j >= n - 1: j = n - 2; dy = 1.0 # Récupération des 4 valeurs voisines v00 = matrix[i][j] v10 = matrix[i + 1][j] v01 = matrix[i][j + 1] v11 = matrix[i + 1][j + 1] # Interpolation bilinéaire interpolated = ( (1 - dx) * (1 - dy) * v00 + dx * (1 - dy) * v10 + (1 - dx) * dy * v01 + dx * dy * v11 ) return interpolated def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label): """ Renvoie une approximation de la valeur à partir des labels binaires (e.g. B5, H1) en utilisant une interpolation simple basée sur les indices. Parameters: matrix_df (pd.DataFrame): Matrice avec les labels binaires en index et colonnes. row_label (str): Label de la ligne (ex: 'B3'). col_label (str): Label de la colonne (ex: 'H2'). Returns: float: Valeur approchée si possible, sinon NaN. """ # Vérification des labels if row_label not in matrice.index or col_label not in matrice.columns: return np.nan # Index correspondant row_idx = self.label_to_index.get(row_label) col_idx = self.label_to_index.get(col_label) # Approximation directe (aucune interpolation complexe ici, juste une lecture) return numeric_matrice[row_idx, col_idx] # @property # def protections(self): # return [ # { # "method": "CooldownPeriod", # "stop_duration_candles": 12 # } # # { # # "method": "MaxDrawdown", # # "lookback_period_candles": self.lookback.value, # # "trade_limit": self.trade_limit.value, # # "stop_duration_candles": self.protection_stop.value, # # "max_allowed_drawdown": self.protection_max_allowed_dd.value, # # "only_per_pair": False # # }, # # { # # "method": "StoplossGuard", # # "lookback_period_candles": 24, # # "trade_limit": 4, # # "stop_duration_candles": self.protection_stoploss_stop.value, # # "only_per_pair": False # # }, # # { # # "method": "StoplossGuard", # # "lookback_period_candles": 24, # # "trade_limit": 4, # # "stop_duration_candles": 2, # # "only_per_pair": False # # }, # # { # # "method": "LowProfitPairs", # # "lookback_period_candles": 6, # # "trade_limit": 2, # # "stop_duration_candles": 60, # # "required_profit": 0.02 # # }, # # { # # "method": "LowProfitPairs", # # "lookback_period_candles": 24, # # "trade_limit": 4, # # "stop_duration_candles": 2, # # "required_profit": 0.01 # # } # ]