# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np from pandas import DataFrame from typing import Optional, Union, Tuple import logging import configparser from technical import pivots_points # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib import requests from datetime import timezone, timedelta logger = logging.getLogger(__name__) from tabulate import tabulate def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df class Zeus_8_3_2_B_4_2(IStrategy): levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] # ROI table: minimal_roi = { "0": 0.564, "567": 0.273, "2814": 0.12, "7675": 0 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = True # Buy hypers timeframe = '5m' max_open_trades = 5 max_amount = 40 # DCA config position_adjustment_enable = True plot_config = { "main_plot": { "min200": { "color": "#86c932" }, "max50": { "color": "white" }, "max200": { "color": "yellow" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", } }, "subplots": { "Rsi": { "rsi": { "color": "pink" } }, "Percent": { "max_min": { "color": "#74effc" } }, "smooth": { 'mid_smooth_deriv1': { "color": "blue" }, 'mid_smooth_deriv1_1h': { "color": "red" } } } } columns_logged = False pairs = { pair: { "first_buy": 0, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, "last_buy": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, "last_candle": {}, "last_trade": None, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 trades = list() max_profit_pairs = {} protection_percent_buy_lost = IntParameter(1, 10, default=5, space='protection') protection_fibo = IntParameter(1, 10, default=2, space='protection') sell_allow_decrease = DecimalParameter(0.005, 0.02, default=0.2, decimals=2, space='sell', optimize=True, load=True) def min_max_scaling(self, series: pd.Series) -> pd.Series: """Normaliser les données en les ramenant entre 0 et 100.""" return 100 * (series - series.min()) / (series.max() - series.min()) def z_score_scaling(self, series: pd.Series) -> pd.Series: """Normaliser les données en utilisant Z-Score Scaling.""" return (series - series.mean()) / series.std() def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: # count_buys = 0 # trade = self.getTrade(pair) # if trade: # filled_buys = trade.select_filled_orders('buy') # count_buys = len(filled_buys) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() # last_candle_12 = dataframe.iloc[-13].squeeze() # allow_to_buy = True #(not self.stop_all) #& (not self.all_down) allow_to_buy = True #not last_candle['tendency'] in ('B-', 'B--') # (rate <= float(limit)) | (entry_tag == 'force_entry') self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.printLog( f"|{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 8}+{'-' * 13}+" f"{'-' * 14}+{'-' * 14}+{'-' * 4}+{'-' * 7}|" ) stake_amount = self.adjust_stake_amount(pair, last_candle) self.log_trade( last_candle=last_candle, date=current_time, action="START BUY" if allow_to_buy else "Canceled", pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() allow_to_sell = (last_candle['percent'] < 0) if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries #self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_trade'] = trade self.pairs[pair]['last_candle'] = last_candle self.trades = list() dispo= round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="Sell", pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(trade.calc_profit(rate, amount), 2) ) self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 return (allow_to_sell) | (exit_reason == 'force_exit') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['haclose'], self.pairs[pair]['last_max']) count_of_buys = trade.nr_of_successful_entries self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = current_profit pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) if (last_candle['tendency'] in ('H++', 'H+')) : # and (last_candle['tendency_1h'] in ('H++', 'H+')): # and (last_candle['tendency_1d'] in ('H++', 'H+')) : return None # if (last_candle['rsi_1d'] > 50) & (last_candle['percent12'] < 0.0): if (last_candle['percent3'] < 0.0) & (current_profit > expected_profit): #last_candle['min_max200'] / 3): self.trades = list() return 'mx_' + str(count_of_buys) if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit): self.trades = list() return 'pft_' + str(count_of_buys) if (current_profit >= expected_profit) & (last_candle['percent'] < 0.0) \ and ((last_candle['rsi'] >= 75) or before_last_candle['rsi'] >= 75)\ and (count_of_buys < 5): self.trades = list() return 'rsi_' + str(count_of_buys) # if (last_candle['percent3'] < -0.002) & (last_candle['percent12'] < 0) & ( # current_profit > last_candle['min_max200'] / 3): # self.trades = list() # return 'mnmx_' + str(count_of_buys) # if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit): # self.trades = list() # return 'profit_' + str(count_of_buys) self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch']) def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs from typing import List def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float: if pct <= thresholds[0]: return factors[0] if pct >= thresholds[-1]: return factors[-1] for i in range(1, len(thresholds)): if pct <= thresholds[i]: # interpolation linéaire entre thresholds[i-1] et thresholds[i] return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / ( thresholds[i] - thresholds[i - 1]) # Juste au cas où (devrait jamais arriver) return factors[-1] def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30, start_factor: float = 1.0, end_factor: float = 2.0) -> float: if pct <= start_pct: return start_factor if pct >= end_pct: return end_factor # interpolation linéaire return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct) def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt': return if self.columns_logged % 30 == 0: # print( # f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|" # ) self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>12} |{'Buys':>4}| {'Stake':>5} |" ) self.printLog( f"|{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 4}+{'-' * 7}|" ) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( # int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = round((last_candle['haclose'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) max_touch = '' #round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1) if trade_type is not None: if np.isnan(last_candle['rsi_1d']): string = ' ' else: string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_diff_1d'])) trade_type = trade_type \ + " " + string \ + " " + str(int(last_candle['rsi_1h'])) \ + " " + str(int(last_candle['rsi_diff_1h'])) self.printLog( f"| {date:<16} | {action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} " f"| {profit or '-':>8} | {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {round(self.pairs[pair]['last_max'], 2) or '-':>12} |{buys or '-':>4}|{stake or '-':>7}" f"|{round(last_candle['sma5_1d'], 2) or '-':>8}" f"|{last_candle['tendency'] or '-':>3}|{last_candle['tendency_1h'] or '-':>3}|{last_candle['tendency_1d'] or '-':>3}" f"|{round(last_candle['mid_smooth_deriv1']):>3}|{round(last_candle['mid_smooth_deriv1_1h']):>5}|" ) def printLog(self, str): if not self.dp.runmode.value in ('backtest', 'hyperopt'): logger.info(str) else: print(str) def add_tendency_column(self, dataframe: pd.DataFrame) -> pd.DataFrame: def tag_by_derivatives(row): d1 = row['mid_smooth_deriv1'] d2 = row['mid_smooth_deriv2'] d1_lim_inf = -5 d1_lim_sup = 5 if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup: return 'P' # Palier if d1 == 0.0: return 'DH' if d2 > 0 else 'DB' #Depart Hausse / Départ Baisse if d1 > d1_lim_sup: return 'H++' if d2 > 0 else 'H+' #Acceleration Hausse / Ralentissement Hausse if d1 < d1_lim_inf: return 'B--' if d2 < 0 else 'B-' # Accéleration Baisse / Ralentissement Baisse return 'Mid' dataframe['tendency'] = dataframe.apply(tag_by_derivatives, axis=1) return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['close_02'] = dataframe['haclose'] * 1.02 dataframe['pct_change'] = dataframe['close'].pct_change(5) dataframe = self.calculateTendency(dataframe) dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=200) dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12) dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50) dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200) dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200) dataframe['max50'] = talib.MAX(dataframe['close'], timeperiod=50) dataframe['max144'] = talib.MAX(dataframe['close'], timeperiod=144) dataframe['min_max50'] = (dataframe['max50'] - dataframe['min50']) / dataframe['min50'] dataframe['min_max200'] = (dataframe['max200'] - dataframe['min200']) / dataframe['min200'] dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close'] dataframe['max50_diff'] = (dataframe['max50'] - dataframe['close']) / dataframe['close'] dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5) dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10) dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20) dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"] dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3) dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5) dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12) dataframe["percent24"] = (dataframe["close"] - dataframe["open"].shift(24)) / dataframe["open"].shift(24) dataframe["percent48"] = (dataframe["close"] - dataframe["open"].shift(48)) / dataframe["open"].shift(48) dataframe["percent_max_144"] = (dataframe["close"] - dataframe["max144"]) / dataframe["close"] # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['close'], length=14) dataframe['rsi_diff'] = dataframe['rsi'].diff() dataframe['rsi_diff_2'] = dataframe['rsi_diff'].diff() # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = ( (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_upperband"] ) # Normalization dataframe['average_line'] = dataframe['close'].mean() dataframe['average_line_50'] = talib.MIDPOINT(dataframe['close'], timeperiod=50) dataframe['average_line_288'] = talib.MIDPOINT(dataframe['close'], timeperiod=288) dataframe['average_line_288_098'] = dataframe['average_line_288'] * 0.98 dataframe['average_line_288_099'] = dataframe['average_line_288'] * 0.99 # Sort the close prices to find the 4 lowest values sorted_close_prices = dataframe['close'].tail(576).sort_values() lowest_4 = sorted_close_prices.head(20) dataframe['lowest_4_average'] = lowest_4.mean() # Propagate this mean value across the entire dataframe # dataframe['lowest_4_average'] = dataframe['lowest_4_average'].iloc[0] # # Sort the close prices to find the 4 highest values sorted_close_prices = dataframe['close'].tail(288).sort_values(ascending=False) highest_4 = sorted_close_prices.head(20) # # Calculate the mean of the 4 highest values dataframe['highest_4_average'] = highest_4.mean() # Compter les baisses consécutives dataframe['down'] = dataframe['hapercent'] <= 0.0001 dataframe['up'] = dataframe['hapercent'] >= 0.0001 dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) dataframe['down_tag'] = (dataframe['down_count'] < -7) dataframe['up_tag'] = (dataframe['up_count'] > 7) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') # Normaliser les données de 'close' # normalized_close = self.min_max_scaling(dataframe['close']) ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") informative = self.calculateTendency(informative, 3) informative['volatility'] = talib.STDDEV(informative['close'], timeperiod=14) / informative['close'] informative['atr'] = (talib.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / informative['close'] informative['rsi'] = talib.RSI(informative['close'], length=7) informative['rsi_diff'] = informative['rsi'].diff() informative['rsi_diff_2'] = informative['rsi_diff'].diff() informative['sma5'] = talib.SMA(informative, timeperiod=5) informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5'] dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative = self.calculateTendency(informative, 3) informative['rsi'] = talib.RSI(informative['close'], length=7) informative['rsi_diff'] = informative['rsi'].diff() informative['rsi_diff_2'] = informative['rsi_diff'].diff() informative['sma5'] = talib.SMA(informative, timeperiod=5) informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5'] sorted_close_prices = informative['close'].tail(365).sort_values() lowest_4 = sorted_close_prices.head(4) informative['lowest_4'] = lowest_4.mean() sorted_close_prices = informative['close'].tail(365).sort_values(ascending=False) highest_4 = sorted_close_prices.head(4) informative['highest_4'] = highest_4.mean() last_14_days = informative.tail(14) # Récupérer le minimum et le maximum de la colonne 'close' des 14 derniers jours min_14_days = last_14_days['close'].min() max_14_days = last_14_days['close'].max() informative['lowest'] = min_14_days informative['highest'] = max_14_days informative['pct_min_max'] = (max_14_days - min_14_days) / min_14_days informative['mid_min_max'] = min_14_days + (max_14_days - min_14_days) / 2 informative['middle'] = informative['lowest_4'] + (informative['highest_4'] - informative['lowest_4']) / 2 informative['mid_min_max_0.98'] = informative['mid_min_max'] * 0.98 dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) dataframe['count_buys'] = 0 dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01 dataframe['amount'] = 0 dataframe['limit'] = dataframe['close'] count_buys = 0 if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue print(trade) filled_buys = trade.select_filled_orders('buy') dataframe['count_buys'] = len(filled_buys) count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price print(buy) count = count + 1 amount += buy.price * buy.filled dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) dataframe['amount'] = amount print(f"amount= {amount}") # # trades = Trade.get_trades([Trade.is_open is False]).all() # trades = Trade.get_trades_proxy(is_open=False, pair=metadata['pair']) # if trades: # trade = trades[-1] # print('closed trade pair is : ') # print(trade) # dataframe['expected_profit'] = (1 + self.expectedProfit(pair, dataframe.iloc[-1])) * dataframe[ # 'last_price'] # dataframe['lbp'] = dataframe['last_price'] # dataframe['lbp_3'] = dataframe['lbp'] * 0.97 # 3 # dataframe['lbp_6'] = dataframe['lbp'] * 0.94 # 6 # dataframe['lbp_9'] = dataframe['lbp'] * 0.90 # 10 # dataframe['lbp_12'] = dataframe['lbp'] * 0.85 # 15 # dataframe['lbp_20'] = dataframe['lbp'] * 0.8 # 20 # dataframe['fbp'] = trade.open_rate # # else: # # last_trade = self.get_trades(pair=pair).order_by('-close_date').first() # # filled_buys = last_trade.select_filled_orders('buy') # # print(last_trade) # # for buy in filled_buys: # # print(filled_buys) # dataframe['buy_level'] = dataframe['lowest_4_average'] * (1 - self.levels[count_buys] / 100) # ---------------------------------------------------------- # Calcul de la variation entre deux bougies successives dataframe['price_change'] = dataframe['close'].diff() # Marquer les bougies en baisse dataframe['is_down'] = dataframe['price_change'] < 0 # Identifier les blocs consécutifs de baisses # dataframe['drop_id'] = (dataframe['is_down'] != dataframe['is_down'].shift(1)).cumsum() dataframe['drop_id'] = np.where(dataframe['is_down'], (dataframe['is_down'] != dataframe['is_down'].shift(12)).cumsum(), np.nan) # Identifier uniquement les blocs de baisse dataframe['drop_id'] = dataframe['drop_id'].where(dataframe['is_down']) # # Grouper par les chutes détectées # drop_info = dataframe.groupby('drop_id').agg( # start=('close', 'first'), # Prix au début de la chute # end=('close', 'last'), # Prix à la fin de la chute # start_index=('close', 'idxmin'), # Début de la chute (index) # end_index=('close', 'idxmax'), # Fin de la chute (index) # ) # # # Calcul de l'ampleur de la chute en % # drop_info['drop_amplitude_pct'] = ((drop_info['end'] - drop_info['start']) / drop_info['start']) * 100 # # Filtrer les chutes avec une amplitude supérieure à 3% # drop_info = drop_info[drop_info['drop_amplitude_pct'] < -3] # ************** # Identifier le prix de début et de fin de chaque chute drop_stats = dataframe.groupby('drop_id').agg( start_price=('close', 'first'), # Prix au début de la chute end_price=('close', 'last'), # Prix à la fin de la chute ) # Calculer l'amplitude en % drop_stats['amplitude_pct'] = ((drop_stats['end_price'] - drop_stats['start_price']) / drop_stats[ 'start_price']) * 100 # drop_stats = drop_stats[drop_stats['amplitude_pct'] < -1] # Associer les amplitudes calculées à chaque drop_id dans dataframe dataframe = dataframe.merge(drop_stats[['amplitude_pct']], on='drop_id', how='left') # Remplir les lignes sans drop_id par 0 dataframe['amplitude_pct'] = dataframe['amplitude_pct'].fillna(0) dataframe['amplitude_pct_60'] = dataframe['amplitude_pct'].rolling(60).sum() # ---------------------------------------------------------- # self.getBinanceOrderBook(pair, dataframe) return dataframe def calculateTendency(self, dataframe, window=12): dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2 # 2. Calcul du lissage sur 200 bougies par moyenne mobile médiane dataframe['mid_smooth'] = dataframe['mid'].rolling(window=window, center=True, min_periods=1).median().rolling( 3).mean() dataframe['mid_smooth_tag_max'] = (dataframe['mid_smooth'].shift(1)) == 0 & (dataframe['mid_smooth'] < 0) dataframe['mid_smooth_tag_min'] = (dataframe['mid_smooth'].shift(1)) == 0 & (dataframe['mid_smooth'] > 0) # 2. Dérivée première = différence entre deux bougies successives dataframe['mid_smooth_deriv1'] = round(100000 * dataframe['mid_smooth'].pct_change(), 2) # 3. Dérivée seconde = différence de la dérivée première dataframe['mid_smooth_deriv2'] = round(100 * dataframe['mid_smooth_deriv1'].pct_change().rolling(3).mean(), 2) dataframe = self.add_tendency_column(dataframe) return dataframe def getOpenTrades(self): # if len(self.trades) == 0: print('search open trades') self.trades = Trade.get_open_trades() return self.trades # def getTrade(self, pair): # trades = self.getOpenTrades() # trade_for_pair = None # for trade in trades: # if trade.pair == pair: # trade_for_pair = trade # break # return trade_for_pair def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] # self.getOpenTrades() expected_profit = self.expectedProfit(pair, dataframe.iloc[-1]) # self.getBinanceOrderBook(pair, dataframe) last_candle = dataframe.iloc[-1].squeeze() print("---------------" + pair + "----------------") print('adjust stake amount ' + str(self.adjust_stake_amount(pair, dataframe.iloc[-1]))) # print('adjust exit price ' + str(self.adjust_exit_price(dataframe.iloc[-1]))) print('calcul expected_profit ' + str(expected_profit)) buy_level = dataframe['average_line_50'] #dataframe['buy_level'] # self.get_buy_level(pair, dataframe) dataframe.loc[ ( (dataframe['max200_diff'] >= 0.01) & (dataframe['percent12'] < -0.002) # & (dataframe['pct_change'] < 0) & (dataframe['open'] < dataframe['average_line_288_099']) & (dataframe['open'] < dataframe['average_line_50']) # & (dataframe['percent'] >= -0.0005) & (dataframe['min12'].shift(2) == dataframe['min12']) & (dataframe['up_count'] > 0) & (dataframe["bb_width"] > 0.01) ), ['enter_long', 'enter_tag']] = (1, 'mx200') dataframe.loc[ ( ( (dataframe['percent12'] < -0.015) | (dataframe['percent24'] < -0.022) | (dataframe['percent48'] < -0.030) ) & (dataframe['close'] <= dataframe['min50'] * 1.002) & (dataframe['open'] < dataframe['average_line_50']) & ( (dataframe['close'] < dataframe['min12'] * 1.002) | (dataframe['percent12'] < -0.022) | (dataframe['percent24'] < -0.022) ) & ( (dataframe['min50'].shift(2) == dataframe['min50']) | (dataframe['percent12'] < -0.022) | (dataframe['percent24'] < -0.022) ) & (dataframe['up_count'] > 0) & (dataframe["bb_width"] > 0.01) ), ['enter_long', 'enter_tag']] = (1, 'pct12') dataframe.loc[ ( (dataframe['close'] <= dataframe['min200'] * 1.002) & (dataframe["bb_width"] > 0.01) & (dataframe['min_max200'] > 0.015) # & (dataframe['pct_change'] < 0) & (dataframe['haopen'] < buy_level) & (dataframe['open'] < dataframe['average_line_288']) & (dataframe['up_count'] > 0) ), ['enter_long', 'enter_tag']] = (1, 'mnmx200') dataframe.loc[ ( (dataframe['close'].shift(2) <= dataframe['min200']) & (dataframe['pct_change'] < 0) & (dataframe['min200'].shift(2) == dataframe['min200']) & (dataframe['close'] < dataframe['lowest_4_average']) & (dataframe['up_count'] > 0) ), ['enter_long', 'enter_tag']] = (1, 'min200') dataframe.loc[ ( # (dataframe['rsi_1h'] < 70) # & (dataframe['rsi_diff_1h'] > -5) # (dataframe["bb_width"] > 0.01) (dataframe['down_count'].shift(1) < - 6) & (dataframe['down_count'] == 0) # & (dataframe['tendency'] != "B--") # & (dataframe['tendency'] != "B-") ), ['enter_long', 'enter_tag']] = (1, 'down') dataframe.loc[ ( (dataframe['low'] < dataframe['min200']) & (dataframe['min50'] == dataframe['min50'].shift(3)) # & (dataframe['tendency'] != "B--") # & (dataframe['tendency'] != "B-") ), ['enter_long', 'enter_tag']] = (1, 'low') dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 50): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 if (len(dataframe) < 1): print("skip dataframe") return None pair = trade.pair if pair not in ('BTC/USDT', 'BTC/USDC', 'XRP/USDT', 'XRP/USDC'): print(f"skip pair {pair}") return None count_of_buys = trade.nr_of_successful_entries # if 'buy' in last_candle: # condition = (last_candle['buy'] == 1) # else: # condition = False # self.protection_nb_buy_lost.value limit = last_candle['limit'] current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pct_first = 0 if self.pairs[pair]['first_buy']: pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) pct = 0.012 if count_of_buys == 1: pct_max = current_profit else: if self.pairs[trade.pair]['last_buy']: pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4) else: pct_max = - pct lim = - pct - (count_of_buys * 0.001) # print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_max={pct_max:.3f} lim={lim:.3f} rsi_diff_1f={last_candle['rsi_diff_1h']}") # if (days_since_open > count_of_buys) & (0 < count_of_buys <= max_buys) & (current_rate <= limit) & (last_candle['enter_long'] == 1): limit_buy = 20 if (count_of_buys < limit_buy) \ and ((last_candle['enter_long'] == 1) or (last_candle['percent48'] < - 0.03) or ((last_candle['min50'] == last_candle_3['min50']) and (last_candle['low'] <= last_candle['min50'])) ) \ and (last_candle['rsi_diff_1h'] >= -5) \ and (last_candle['tendency'] in ('P', 'H++', 'DH', 'H+')) \ and ((pct_max < lim)): try: # print(self.adjust_stake_amount(pair, last_candle)) # print(pct_first) # print(pct) stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / pct) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys]) trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.log_trade( last_candle=last_candle, date=current_time, action="Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount except Exception as exception: print(exception) return None return None def adjust_stake_amount(self, pair: str, last_candle: DataFrame): # Calculer le minimum des 14 derniers jours current_price = last_candle['close'] # trade = self.getTrade(pair) # if trade: # current_price = trade.open_rate base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré # Calculer le max des 14 derniers jours min_14_days_4 = last_candle['lowest_4_1d'] max_14_days_4 = last_candle['highest_4_1d'] percent_4 = 1 - (current_price - min_14_days_4) / (max_14_days_4 - min_14_days_4) factor_4 = 1 / ((current_price - min_14_days_4) / (max_14_days_4 - min_14_days_4)) max_min_4 = max_14_days_4 / min_14_days_4 # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # percent = 1 - (current_price - min_14_days) / (max_14_days - min_14_days) # factor = 1 / ((current_price - min_14_days) / (max_14_days - min_14_days)) # max_min = max_14_days / min_14_days # Stack amount ajusté price=2473.47 min_max=0.15058074985054215 percent=0.8379141364642171 amount=20.0 first_price = self.pairs[pair]['first_buy'] if (first_price == 0): first_price = last_candle['close'] last_max = last_candle['max200'] if self.pairs[pair]['last_max'] > 0: last_max = self.pairs[pair]['last_max'] last_count = self.pairs[pair]['last_count_of_buys'] # factor = 1 # # if last_max > 0: # pct = 100 * (last_max - first_price) / last_max # # if pct >= 20: # factor = 2 # else: # if pct >= 15: # factor = 1.5 pct = 5 if last_max > 0: pct = 100 * (last_max - first_price) / last_max thresholds = [2, 10, 20, 30] factors = [0.5, 1.0, 1.5, 2.0] factor = self.multi_step_interpolate(pct, thresholds, factors) # factor = self.interpolate_factor(pct, start_pct=5, end_pct=50, start_factor=0.8, end_factor=3.0) adjusted_stake_amount = base_stake_amount * factor #max(base_stake_amount, min(100, base_stake_amount * percent_4)) # if pair in ('BTC/USDT', 'ETH/USDT'): # if percent_4 > 0.5: # adjusted_stake_amount = 300 # adjusted_stake_amount_2 = max(base_stake_amount / 2.5, min(75, base_stake_amount * percent)) # print(f"Stack amount ajusté price={current_price} factor={factor} amount={adjusted_stake_amount:.4f}") # print(f"Stack amount ajusté price={current_price} max_min={max_min:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} factor={factor:.4f} percent={percent:.4f} amount={adjusted_stake_amount_2:.4f}") return adjusted_stake_amount # def adjust_exit_price(self, dataframe: DataFrame): # # Calculer le max des 14 derniers jours # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # entry_price = dataframe['fbp'] # current_price = dataframe['close'] # percent = 0.5 * (max_14_days - min_14_days) / min_14_days # exit_price = (1 + percent) * entry_price # # print(f"Exit price ajusté price={current_price:.4f} max_14={max_14_days:.4f} exit_price={exit_price:.4f}") # # return exit_price # def adjust_stoploss(self, pair: str, trade: 'Trade', current_time: datetime, # current_rate: float, current_profit: float, **kwargs) -> float: # dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) # # print(dataframe) # last_candle = dataframe.iloc[-1].squeeze() # # # Utiliser l'ATR pour ajuster le stoploss # atr_stoploss = current_rate - (last_candle['atr'] * 1.5) # Stoploss à 1.5x l'ATR # # # Retourner le stoploss dynamique en pourcentage du prix actuel # return (atr_stoploss / current_rate) - 1 def expectedProfit(self, pair: str, last_candle: DataFrame): first_price = last_candle['first_price'] first_max = (last_candle['max200'] - first_price) / first_price expected_profit = min(0.01, first_max * 0.5) # print( # f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}") # self.analyze_conditions(pair, dataframe) return expected_profit # def adjust_exit_price(self, dataframe: DataFrame): # # Calculer le max des 14 derniers jours # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # entry_price = dataframe['fbp'] # current_price = dataframe['close'] # percent = 0.5 * (max_14_days - min_14_days) / min_14_days # exit_price = (1 + percent) * entry_price # # print(f"Exit price ajusté price={current_price} max_14={max_14_days} exit_price={exit_price}") # # return exit_price # def adjust_entry_price(self, dataframe: DataFrame): # # Calculer le max des 14 derniers jours # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # current_price = dataframe['close'] # percent = 0.5 * (max_14_days - min_14_days) / min_14_days # entry_price = (1 + percent) * entry_price # # print(f"Entry price ajusté price={current_price} max_14={max_14_days} exit_price={entry_price}") # # return entry_price # def adjust_stake_amount(self, dataframe: DataFrame): # # Calculer le minimum des 14 derniers jours # middle = dataframe['middle_1d'] # # # Récupérer la dernière cotation actuelle (peut être le dernier point de la série) # current_price = dataframe['close'] # # # Calculer l'écart entre la cotation actuelle et le minimum des 14 derniers jours # difference = middle - current_price # # Ajuster la stake_amount en fonction de l'écart # # Par exemple, augmenter la stake_amount proportionnellement à l'écart # base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré # # multiplier = 1 - (difference / current_price) # Exemple de logique d'ajustement # # adjusted_stake_amount = max(base_stake_amount / 2.5, base_stake_amount * multiplier) # # # difference = 346.07000000000016 # # price = 2641.75 # # min_14 = 2295.68 # # amount = 56.5500141951358 # # print(f"Stack amount ajusté difference={difference} price={current_price} middle={middle} multiplier={multiplier} amount={adjusted_stake_amount}") # # return adjusted_stake_amount def analyze_conditions(self, pair: str, row: DataFrame): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) if dataframe is None or dataframe.empty: return if row is None or row.empty: return # Créer un tableau pour stocker les résultats de l'analyse results = [] # row = dataframe.iloc[-1].squeeze() # result = {'triggered': False, 'conditions_failed': []} try: buy_level = row['buy_level'] except Exception as exception: print(exception) return None # Première condition : 'buy_fractal' print('------------------------------------------------') print('Test buy fractal ' + pair + ' buy_level=' + str(buy_level)) if not ( (row['close'] <= (row['min200'] * 1.002)) and (row['percent_max_144'] <= -0.012) and (row['haopen'] < buy_level) and (row['open'] < row['average_line_288']) and (dataframe['min50'].shift(3).iloc[-1] == row['min50']) ): failed_conditions = [] if row['close'] > (row['min200'] * 1.002): print('close > min200 * 1.002') if row['percent_max_144'] > -0.012: print('percent_max_144 > -0.012') if row['haopen'] >= buy_level: print('haopen >= buy_level') if row['open'] >= row['average_line_288']: print('open >= average_line_288') if dataframe['min50'].shift(3).iloc[-1] != row['min50']: print('min50.shift(3) != min50') # result['conditions_failed'].append({'buy_fractal': failed_conditions}) print('------------------------------------------------') print('Test buy_max_diff_015 ' + pair + ' buy_level=' + str(buy_level)) # Deuxième condition : 'buy_max_diff_015' if not ( (dataframe['max200_diff'].shift(4).iloc[-1] >= 0.015) and (row['close'] <= row['lowest_4_average'] * 1.002) and (row['close'] <= row['min200'] * 1.002) and (dataframe['max50_diff'].shift(4).iloc[-1] >= 0.01) and (row['haclose'] < row['bb_middleband']) and (row['close'] < buy_level) and (row['open'] < row['average_line_288']) and (dataframe['min50'].shift(3).iloc[-1] == row['min50']) ): if dataframe['max200_diff'].shift(4).iloc[-1] < 0.015: print('max200_diff.shift(4) < 0.015') if row['close'] > row['lowest_4_average'] * 1.002: print('close > lowest_4_average * 1.002') if row['close'] > row['min200'] * 1.002: print('close > min200 * 1.002') if dataframe['max50_diff'].shift(4).iloc[-1] < 0.01: print('max50_diff.shift(4) < 0.01') if row['haclose'] >= row['bb_middleband']: print('haclose >= bb_middleband') if row['close'] >= buy_level: print('close >= buy_level') if row['open'] >= row['average_line_288']: print('open >= average_line_288') if dataframe['min50'].shift(3).iloc[-1] != row['min50']: print('min50.shift(3) != min50') print('------------------------------------------------') print('Test buy_min_max_200 ' + pair + ' buy_level=' + str(buy_level)) if not ( (row['close'] <= row['min200'] * 1.002) and (row['min_max200'] > 0.015) and (row['haopen'] < buy_level) and (row['open'] < row['average_line_288']) ): if row['close'] > row['min200'] * 1.002: print('close > row[min200] * 1.002') if row['min_max200'] <= 0.015: print('row[min_max200] <= 0.015') if row['haopen'] < buy_level: print('row[haopen] < buy_level') if row['open'] < row['average_line_288']: print('row[open] >= row[average_line_288]') print('------------------------------------------------') # Ajouter le résultat à la liste des résultats # results.append(result) # print(result) def getBinanceOrderBook(self, pair, dataframe: DataFrame): """Fetch the order book (depth) from Binance.""" # print(dataframe) last_candle = dataframe.iloc[-1].squeeze() symbol = pair.replace('/', '') try: url = f"https://api.binance.com/api/v3/depth?symbol={symbol}&limit=5000" response = requests.get(url) data = response.json() # Extract bids and asks from the order book asks, bids = self.calculateSMA(20, data['asks'], data['bids']) # Ventes List of [price, quantity] # bids = data['bids'] # asks = data['asks'] # Achats List of [price, quantity] # Process the depth data as you need it # bid_volume = sum([float(bid[1]) for bid in bids]) # Sum of all bid volumes # $ * nb / $ => nb bid_volume = sum([float(bid[0]) * float(bid[1]) / float(last_candle['close']) for bid in bids[:10]]) # ask_volume = sum([float(ask[1]) for ask in asks]) # Sum of all ask volumes ask_volume = sum([float(ask[0]) * float(ask[1]) / float(last_candle['close']) for ask in asks[:10]]) # Example: add the difference in volumes as an indicator if bid_volume and ask_volume: self.updateLastValue(dataframe, 'depth_bid_ask_diff', round(bid_volume - ask_volume, 2)) else: self.updateLastValue(dataframe, 'depth_bid_ask_diff', 0) # probabilité baisse hausse sur les n premiers élements for start in [0, 50, 100, 150]: self.updateLastValue(dataframe, 'prob_hausse_' + str(start + 50), self.calculateProbaNb(asks, bids, start, start + 50)) # dataframe['prob_hausse_' + str(nb)] = self.calculateProbaNb(asks, bids, nb) # Analyse des prix moyens pondérés par les volumes (VWAP) : # # Le VWAP (Volume Weighted Average Price) peut être utilisé pour comprendre la pression directionnelle. # Si le VWAP basé sur les ordres d'achat est plus élevé que celui des ordres de vente, # cela peut indiquer une probabilité de hausse. nb = 50 bid_vwap = sum([float(bid[0]) * float(bid[1]) for bid in bids[:nb]]) / sum( [float(bid[1]) for bid in bids[:nb]]) ask_vwap = sum([float(ask[0]) * float(ask[1]) for ask in asks[:nb]]) / sum( [float(ask[1]) for ask in asks[:nb]]) if bid_vwap > ask_vwap: self.updateLastValue(dataframe, 'vwap_hausse', round(100 * (bid_vwap - ask_vwap) / (bid_vwap + ask_vwap), 2)) else: self.updateLastValue(dataframe, 'vwap_hausse', - round(100 * (ask_vwap - bid_vwap) / (bid_vwap + ask_vwap), 2)) current_price = last_candle['close'] # le prix actuel du marché # Calcul du seuil de variation de 1% lower_threshold = current_price * 0.99 upper_threshold = current_price * 1.01 # Volumes d'achat (bids) sous 1% du prix actuel bid_volume_1percent = sum( [float(bid[1]) for bid in bids if current_price >= float(bid[0]) >= lower_threshold]) # Volumes de vente (asks) au-dessus de 1% du prix actuel ask_volume_1percent = sum( [float(ask[1]) for ask in asks if current_price <= float(ask[0]) <= upper_threshold]) # Estimation de la probabilité basée sur le déséquilibre des volumes total_volume = bid_volume_1percent + ask_volume_1percent if total_volume > 0: prob_hausse = bid_volume_1percent / total_volume prob_baisse = ask_volume_1percent / total_volume else: prob_hausse = prob_baisse = 0 self.updateLastValue(dataframe, 'proba_hausse_1%', round(prob_hausse * 100, 2)) self.updateLastValue(dataframe, 'proba_baisse_1%', round(prob_baisse * 100, 2)) print(f"Probabilité de hausse de 1%: {prob_hausse * 100:.2f}%") print(f"Probabilité de baisse de 1%: {prob_baisse * 100:.2f}%") self.calculateResistance(pair, asks, dataframe) self.calculateSupport(pair, bids, dataframe) dataframe['r_s'] = 100 * (dataframe['r_min'] - dataframe['s_min']) / dataframe['s_min'] except Exception as e: logger.error(f"Error fetching order book: {e}") return None, None def calculateProbaNb(self, asks, bids, start, nb): top_bids = sum([float(bid[1]) for bid in bids[start:nb]]) top_asks = sum([float(ask[1]) for ask in asks[start:nb]]) if top_bids > top_asks: proba = round(100 * (top_bids - top_asks) / (top_bids + top_asks), 2) else: proba = - round(100 * (top_asks - top_bids) / (top_bids + top_asks), 2) return proba def calculateResistance(self, pair, asks, dataframe: DataFrame): last_candle = dataframe.iloc[-1].squeeze() # Filtrage +-5% current_price = float(last_candle['close']) lower_bound = current_price * 0.95 upper_bound = current_price * 1.05 ask_prices = [float(ask[0]) for ask in asks] ask_volumes = [float(ask[1]) for ask in asks] ask_df = pd.DataFrame({'price': ask_prices, 'volume': ask_volumes}) filtered_ask_df = ask_df[(ask_df['price'] >= lower_bound) & (ask_df['price'] <= upper_bound)] # Trier le DataFrame sur la colonne 'volume' en ordre décroissant sorted_ask_df = filtered_ask_df.sort_values(by='volume', ascending=False) # Ne garder que les 3 premières lignes (les 3 plus gros volumes) top_3_asks = sorted_ask_df.head(3) print(top_3_asks) # Convertir les ordres de vente en numpy array pour faciliter le traitement asks_array = np.array(filtered_ask_df, dtype=float) # Détecter les résistances : on peut définir qu'une résistance est un niveau de prix où la quantité est élevée # Ex: seuil de résistance à partir des 10% des plus grosses quantités resistance_threshold = np.percentile(asks_array[:, 1], 90) resistances = asks_array[asks_array[:, 1] >= resistance_threshold] # Afficher les résistances trouvées # print(f"{pair} Niveaux de résistance détectés:") # for resistance in resistances: # print(f"{pair} Prix: {resistance[0]}, Quantité: {resistance[1]}") # Exemple : somme des quantités sur des plages de prix # Intervalles de 10 USDT step = last_candle['close'] / 100 price_intervals = np.arange(asks_array[:, 0].min(), asks_array[:, 0].max(), step=step) for start_price in price_intervals: end_price = start_price + step mask = (asks_array[:, 0] >= start_price) & (asks_array[:, 0] < end_price) volume_in_range = asks_array[mask, 1].sum() amount = volume_in_range * end_price print( f"Prix entre {start_price:.6f} et {end_price:.6f}: Volume total = {volume_in_range:.2f} amount={amount:.2f}") # Trier les asks par quantité en ordre décroissant asks_sorted = asks_array[asks_array[:, 1].argsort()][::-1] # Sélectionner les trois plus gros resistances top_3_resistances = asks_sorted[:3] # Afficher les trois plus gros resistances print("Les trois plus grosses resistances détectées : ") self.updateLastValue(dataframe, 'r3', top_3_resistances[0][0]) self.updateLastValue(dataframe, 'r2', top_3_resistances[1][0]) self.updateLastValue(dataframe, 'r1', top_3_resistances[2][0]) self.updateLastValue(dataframe, 'r_min', min(top_3_resistances[0][0], top_3_resistances[1][0], top_3_resistances[2][0])) for resistance in top_3_resistances: print(f"{pair} Prix: {resistance[0]}, Quantité: {resistance[1]}") def calculateSupport(self, pair, bids, dataframe: DataFrame): last_candle = dataframe.iloc[-1].squeeze() # Convert to pandas DataFrame to apply moving average current_price = float(last_candle['close']) lower_bound = current_price * 0.95 upper_bound = current_price * 1.05 bid_prices = [float(bid[0]) for bid in bids] bid_volumes = [float(bid[1]) for bid in bids] bid_df = pd.DataFrame({'price': bid_prices, 'volume': bid_volumes}) filtered_bid_df = bid_df[(bid_df['price'] >= lower_bound) & (bid_df['price'] <= upper_bound)] # Trier le DataFrame sur la colonne 'volume' en ordre décroissant sorted_bid_df = filtered_bid_df.sort_values(by='volume', ascending=False) # Ne garder que les 3 premières lignes (les 3 plus gros volumes) top_3_bids = sorted_bid_df.head(3) print(top_3_bids) # Convertir les ordres d'achat en numpy array pour faciliter le traitement bids_array = np.array(filtered_bid_df, dtype=float) # Détecter les supports : on peut définir qu'un support est un niveau de prix où la quantité est élevée # Ex: seuil de support à partir des 10% des plus grosses quantités support_threshold = np.percentile(bids_array[:, 1], 90) supports = bids_array[bids_array[:, 1] >= support_threshold] # Afficher les supports trouvés # print(f"{pair} Niveaux de support détectés:") # for support in supports: # print(f"{pair} Prix: {support[0]}, Quantité: {support[1]}") step = last_candle['close'] / 100 # Exemple : somme des quantités sur des plages de prix pour les bids price_intervals = np.arange(bids_array[:, 0].min(), bids_array[:, 0].max(), step=step) # Intervalles de 10 USDT for start_price in price_intervals: end_price = start_price + step mask = (bids_array[:, 0] >= start_price) & (bids_array[:, 0] < end_price) volume_in_range = bids_array[mask, 1].sum() amount = volume_in_range * end_price print( f"Prix entre {start_price:.6f} et {end_price:.6f}: Volume total = {volume_in_range:.2f} amount={amount:.2f}") # Trier les bids par quantité en ordre décroissant bids_sorted = bids_array[bids_array[:, 1].argsort()][::-1] # Sélectionner les trois plus gros supports top_3_supports = bids_sorted[:3] # Afficher les trois plus gros supports print("Les trois plus gros supports détectés:") self.updateLastValue(dataframe, 's1', top_3_supports[0][0]) self.updateLastValue(dataframe, 's2', top_3_supports[1][0]) self.updateLastValue(dataframe, 's3', top_3_supports[2][0]) self.updateLastValue(dataframe, 's_min', max(top_3_supports[0][0], top_3_supports[1][0], top_3_supports[2][0])) for support in top_3_supports: print(f"{pair} Prix: {support[0]}, Quantité: {support[1]}") def updateLastValue(self, df: DataFrame, col, value): if col in df.columns: print(f"update last col {col} {value}") df.iloc[-1, df.columns.get_loc(col)] = value else: print(f"update all col {col} {value}") df[col] = value # def update_last_record(self, dataframe: DataFrame, new_data): # # Vérifiez si de nouvelles données ont été reçues # if new_data is not None: # # Ne mettez à jour que la dernière ligne du dataframe # last_index = dataframe.index[-1] # Sélectionne le dernier enregistrement # dataframe.loc[last_index] = new_data # Met à jour le dernier enregistrement avec les nouvelles données # return dataframe def calculateSMA(self, nb, asks, bids): # Prepare data for plotting bid_prices = [float(bid[0]) for bid in bids] bid_volumes = [float(bid[1]) for bid in bids] ask_prices = [float(ask[0]) for ask in asks] ask_volumes = [float(ask[1]) for ask in asks] # Convert to pandas DataFrame to apply moving average bid_df = pd.DataFrame({'price': bid_prices, 'volume': bid_volumes}) ask_df = pd.DataFrame({'price': ask_prices, 'volume': ask_volumes}) # Apply a rolling window to calculate a 10-value simple moving average (SMA) bid_df['volume_sma'] = bid_df['volume'].rolling(window=nb).mean() ask_df['volume_sma'] = ask_df['volume'].rolling(window=nb).mean() # Pour bid_df bid_df = bid_df.dropna(subset=['volume_sma']) bids_with_sma = list(zip(bid_df['price'], bid_df['volume_sma'])) # Pour ask_df ask_df = ask_df.dropna(subset=['volume_sma']) asks_with_sma = list(zip(ask_df['price'], ask_df['volume_sma'])) # print(bids_with_sma) # print(asks_with_sma) return asks_with_sma, bids_with_sma def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values