# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np from pandas import DataFrame from typing import Optional, Union, Tuple from scipy.special import binom import logging import configparser from technical import pivots_points # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib import requests from datetime import timezone, timedelta import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures from sklearn.pipeline import make_pipeline logger = logging.getLogger(__name__) from tabulate import tabulate def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df class Zeus_11(IStrategy): levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] # ROI table: minimal_roi = { "0": 10 } # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = False # Buy hypers timeframe = '5m' columns_logged = False # DCA config position_adjustment_enable = True plot_config = { "main_plot": { "min200": { "color": "#86c932" }, "max50": { "color": "white" }, "max200": { "color": "yellow" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", } }, "subplots": { "Rsi": { "rsi": { "color": "pink" } }, "Percent": { "max_min": { "color": "#74effc" } } } } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 trades = list() max_profit_pairs = {} pairs = { pair: { "first_buy": 0, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, "last_buy": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, "last_candle": {}, "last_trade": None, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # def min_max_scaling(self, series: pd.Series) -> pd.Series: # """Normaliser les données en les ramenant entre 0 et 100.""" # return 100 * (series - series.min()) / (series.max() - series.min()) # # def z_score_scaling(self, series: pd.Series) -> pd.Series: # """Normaliser les données en utilisant Z-Score Scaling.""" # return (series - series.mean()) / series.std() def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: # count_buys = 0 # trade = self.getTrade(pair) # if trade: # filled_buys = trade.select_filled_orders('buy') # count_buys = len(filled_buys) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() # last_candle_12 = dataframe.iloc[-13].squeeze() # allow_to_buy = True #(not self.stop_all) #& (not self.all_down) allow_to_buy = True # (rate <= float(limit)) | (entry_tag == 'force_entry') self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 print( f"|{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|" ) stake_amount = self.adjust_stake_amount(pair, last_candle) self.log_trade( last_candle=last_candle, date=current_time, action="START BUY", pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() allow_to_sell = (last_candle['percent'] < 0) if allow_to_sell: self.pairs[pair]['last_count_of_buys'] = self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_trade'] = trade self.pairs[pair]['last_candle'] = last_candle self.trades = list() dispo= round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="Sell", pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(trade.calc_profit(rate, amount), 2) ) self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 # else: # print('Cancel Sell ' + exit_reason + ' ' + str(current_time) + ' ' + pair) return (allow_to_sell) | (exit_reason == 'force_exit') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() #count_of_buys = trade.nr_of_successful_entries max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['haclose'], self.pairs[pair]['last_max']) last_lost = (last_candle['close'] - max_touch_before) / max_touch_before count_of_buys = trade.nr_of_successful_entries self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = current_profit expected_profit = self.expectedProfit(pair, last_candle) if (last_candle['rsi_1d'] > 50) & (last_candle['percent12'] < 0.0): if (last_candle['percent3'] < 0.0) & (current_profit > last_candle['min_max200'] / 3): self.trades = list() return 'mx_' + str(count_of_buys) if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit): self.trades = list() return 'profit_' + str(count_of_buys) if (current_profit >= expected_profit) & (last_candle['percent'] < 0.0) \ and ((last_candle['rsi'] >= 75) or before_last_candle['rsi'] >= 75): self.trades = list() return 'rsi_' + str(count_of_buys) self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch']) def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt': return if self.columns_logged % 30 == 0: # print( # f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|" # ) print( f"| {'Date':<16} | {'Action':<10} | {'Pair':<5} | {'Trade Type':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>5} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>12} | {'Buys':>5} | {'Stake':>10} |" ) print( f"|{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|" ) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( # int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = round((last_candle['haclose'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) max_touch = '' #round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1) if trade_type is not None: if np.isnan(last_candle['rsi_1d']): string = ' ' else: string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_diff_1d'])) trade_type = trade_type \ + " " + string \ + " " + str(int(last_candle['rsi_1h'])) \ + " " + str(int(last_candle['rsi_diff_1h'])) print( f"| {date:<16} | {action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {pct_max or '-':>5} | {max_touch or '-':>11} | {last_lost or '-':>12} | {round(self.pairs[pair]['last_max'], 2) or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |" ) def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['close_02'] = dataframe['haclose'] * 1.02 dataframe['pct_change'] = dataframe['close'].pct_change(5) dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=200) dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12) dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50) dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200) dataframe['max50'] = talib.MAX(dataframe['close'], timeperiod=50) dataframe['min_max50'] = (dataframe['max50'] - dataframe['min50']) / dataframe['min50'] dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200) dataframe['min_max200'] = (dataframe['max200'] - dataframe['min200']) / dataframe['min200'] dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close'] dataframe['max50_diff'] = (dataframe['max50'] - dataframe['close']) / dataframe['close'] dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5) dataframe['sma5_pct'] = (dataframe['sma5'] - dataframe['sma5']) / dataframe['sma5'] dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10) dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20) dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"] dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3) dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5) dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12) dataframe["percent24"] = (dataframe["close"] - dataframe["open"].shift(24)) / dataframe["open"].shift(24) dataframe["percent48"] = (dataframe["close"] - dataframe["open"].shift(48)) / dataframe["open"].shift(48) # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14) # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) # Normalization dataframe['average_line'] = dataframe['close'].mean() dataframe['average_line_50'] = talib.MIDPOINT(dataframe['close'], timeperiod=50) dataframe['average_line_288'] = talib.MIDPOINT(dataframe['close'], timeperiod=288) # Sort the close prices to find the 4 lowest values sorted_close_prices = dataframe['close'].tail(576).sort_values() lowest_4 = sorted_close_prices.head(20) dataframe['lowest_4_average'] = lowest_4.mean() # Propagate this mean value across the entire dataframe # dataframe['lowest_4_average'] = dataframe['lowest_4_average'].iloc[0] # # Sort the close prices to find the 4 highest values sorted_close_prices = dataframe['close'].tail(288).sort_values(ascending=False) highest_4 = sorted_close_prices.head(20) # # Calculate the mean of the 4 highest values dataframe['highest_4_average'] = highest_4.mean() # # Propagate this mean value across the entire dataframe # dataframe['highest_4_average'] = dataframe['highest_4_average'].iloc[0] # dataframe['pct_average'] = (dataframe['highest_4_average'] - dataframe['close']) / dataframe['lowest_4_average'] # dataframe['highest_4_average_1'] = dataframe['highest_4_average'] * 0.99 # dataframe['highest_4_average_2'] = dataframe['highest_4_average'] * 0.98 # dataframe['highest_4_average_3'] = dataframe['highest_4_average'] * 0.97 # dataframe['highest_4_average_4'] = dataframe['highest_4_average'] * 0.96 # dataframe['highest_4_average_5'] = dataframe['highest_4_average'] * 0.95 # Compter les baisses consécutives dataframe['down'] = dataframe['hapercent'] <= 0.001 dataframe['up'] = dataframe['hapercent'] >= -0.001 dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) dataframe['down_tag'] = (dataframe['down_count'] < -7) dataframe['up_tag'] = (dataframe['up_count'] > 7) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') # Normaliser les données de 'close' # normalized_close = self.min_max_scaling(dataframe['close']) ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") # x_percent = 0.01 # n_hours = 6 # n_candles = n_hours * 60 # metadata["timeframe"] # Convertir en bougies # # informative["max_profit"] = dataframe["informative"].rolling(n_candles).max() # informative["profit_hit"] = dataframe["informative"] >= informative["close"] * (1 + x_percent) # informative['rsi'] = talib.RSI(informative['close'], timeperiod=7) informative['rsi_diff'] = informative['rsi'] - informative['rsi'].shift(1) informative['sma5'] = talib.SMA(informative, timeperiod=5) informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5'] dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative['rsi'] = talib.RSI(informative['close'], timeperiod=7) informative['rsi_diff'] = informative['rsi'] - informative['rsi'].shift(1) informative['sma5'] = talib.SMA(informative, timeperiod=5) informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5'] sorted_close_prices = informative['close'].tail(365).sort_values() lowest_4 = sorted_close_prices.head(4) informative['lowest_4'] = lowest_4.mean() sorted_close_prices = informative['close'].tail(365).sort_values(ascending=False) highest_4 = sorted_close_prices.head(4) informative['highest_4'] = highest_4.mean() last_14_days = informative.tail(14) # Récupérer le minimum et le maximum de la colonne 'close' des 14 derniers jours min_14_days = last_14_days['close'].min() max_14_days = last_14_days['close'].max() informative['lowest'] = min_14_days informative['highest'] = max_14_days informative['pct_min_max'] = (max_14_days - min_14_days) / min_14_days informative['mid_min_max'] = min_14_days + (max_14_days - min_14_days) / 2 informative['middle'] = informative['lowest_4'] + (informative['highest_4'] - informative['lowest_4']) / 2 informative['mid_min_max_0.98'] = informative['mid_min_max'] * 0.98 dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) dataframe['count_buys'] = 0 dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01 dataframe['amount'] = 0 dataframe['limit'] = dataframe['close'] count_buys = 0 if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue print(trade) filled_buys = trade.select_filled_orders('buy') dataframe['count_buys'] = len(filled_buys) count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[trade.pair]['last_buy'] = buy.price print(buy) count = count + 1 amount += buy.price * buy.filled dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) dataframe['amount'] = amount print(f"amount= {amount}") # # trades = Trade.get_trades([Trade.is_open is False]).all() # trades = Trade.get_trades_proxy(is_open=False, pair=metadata['pair']) # if trades: # trade = trades[-1] # print('closed trade pair is : ') # print(trade) # dataframe['expected_profit'] = (1 + self.expectedProfit(pair, dataframe.iloc[-1])) * dataframe[ # 'last_price'] # dataframe['lbp'] = dataframe['last_price'] # dataframe['lbp_3'] = dataframe['lbp'] * 0.97 # 3 # dataframe['lbp_6'] = dataframe['lbp'] * 0.94 # 6 # dataframe['lbp_9'] = dataframe['lbp'] * 0.90 # 10 # dataframe['lbp_12'] = dataframe['lbp'] * 0.85 # 15 # dataframe['lbp_20'] = dataframe['lbp'] * 0.8 # 20 # dataframe['fbp'] = trade.open_rate # # else: # # last_trade = self.get_trades(pair=pair).order_by('-close_date').first() # # filled_buys = last_trade.select_filled_orders('buy') # # print(last_trade) # # for buy in filled_buys: # # print(filled_buys) #dataframe['buy_level'] = dataframe['lowest_4_average'] * (1 - self.levels[count_buys] / 100) dataframe['buy_level'] = dataframe['max50'] * 0.99 #(1 - self.levels[count_buys] / 100) # ---------------------------------------------------------- # Calcul de la variation entre deux bougies successives dataframe['price_change'] = dataframe['close'].diff() # Marquer les bougies en baisse dataframe['is_down'] = dataframe['price_change'] < 0 # Identifier les blocs consécutifs de baisses # dataframe['drop_id'] = (dataframe['is_down'] != dataframe['is_down'].shift(1)).cumsum() dataframe['drop_id'] = np.where(dataframe['is_down'], (dataframe['is_down'] != dataframe['is_down'].shift(12)).cumsum(), np.nan) # Identifier uniquement les blocs de baisse dataframe['drop_id'] = dataframe['drop_id'].where(dataframe['is_down']) # # Grouper par les chutes détectées # drop_info = dataframe.groupby('drop_id').agg( # start=('close', 'first'), # Prix au début de la chute # end=('close', 'last'), # Prix à la fin de la chute # start_index=('close', 'idxmin'), # Début de la chute (index) # end_index=('close', 'idxmax'), # Fin de la chute (index) # ) # # # Calcul de l'ampleur de la chute en % # drop_info['drop_amplitude_pct'] = ((drop_info['end'] - drop_info['start']) / drop_info['start']) * 100 # # Filtrer les chutes avec une amplitude supérieure à 3% # drop_info = drop_info[drop_info['drop_amplitude_pct'] < -3] # ************** # # Identifier le prix de début et de fin de chaque chute # drop_stats = dataframe.groupby('drop_id').agg( # start_price=('close', 'first'), # Prix au début de la chute # end_price=('close', 'last'), # Prix à la fin de la chute # ) return dataframe def getOpenTrades(self): # if len(self.trades) == 0: print('search open trades') self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] #expected_profit = self.expectedProfit(pair, dataframe.iloc[-1]) #last_candle = dataframe.iloc[-1].squeeze() # print("---------------" + pair + "----------------") # print('adjust stake amount ' + str(self.adjust_stake_amount(pair, dataframe.iloc[-1]))) # # print('adjust exit price ' + str(self.adjust_exit_price(dataframe.iloc[-1]))) # print('calcul expected_profit ' + str(expected_profit)) # buy_level = dataframe['buy_level'] # self.get_buy_level(pair, dataframe) dataframe.loc[ ( (dataframe['rsi_1h'] < 70) & (dataframe['rsi_diff_1h'] > -5) # (dataframe['down_count'].shift(1) < - 6) # & (dataframe['down_count'] == 0) # & (dataframe['down_pct'].shift(1) <= -0.5) ), ['enter_long', 'enter_tag']] = (1, 'down') dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) # for i in range(len(dataframe) - 48): # last_candle = dataframe.iloc[i] # if last_candle['enter_long'] is not None: # if last_candle['enter_long'] == 1: # futur_candle = dataframe.iloc[i + 48] # sma5pct_1h = last_candle['sma5_pct_1h'] # sma5pct_1d = last_candle['sma5_pct_1d'] # i = i + 48 # print(f"{i} ===> ;{sma5pct_1d:.2f};{sma5pct_1h:.2f};{100 * futur_candle['percent48']:.1f}") # print(dataframe.columns) # # colonnes = [ # 'hapercent', 'close_02', 'pct_change', 'max200_diff', # 'max50_diff', 'sma5_pct', 'percent', 'percent3', # 'percent5', 'percent12', 'percent24', 'percent48', 'rsi', # 'bb_percent', 'down_count', # 'up_count', 'down_pct', 'up_pct', 'volume_1h', 'rsi_1h', # 'sma5_pct_1h', 'volume_1d', 'rsi_1d', 'sma5_pct_1d', # 'pct_min_max_1d'] # # exclude_cols = ['date', 'enter_tag', 'close', 'open', 'low', 'high', 'haclose', 'haopen', 'halow', 'hahigh' # , 'date_1h', 'close_1h', 'open_1h', 'low_1h', 'high_1h', 'haclose_1h', 'haopen_1h', 'halow_1h', 'hahigh_1h' # , 'date_1d', 'close_1d', 'open_1d', 'low_1d', 'high_1d', 'haclose_1d', 'haopen_1d', 'halow_1d', 'hahigh_1d'] # for column in colonnes: # for column2 in colonnes: # print('===============================================') # print(f"Colonne 1: {column} Colonne 2: {column2}") # list_1 = [] # list_2 = [] # data = [] # key_1 = column # key_2 = column2 # futur = 'percent48' # # for i in range(200, len(dataframe) - 48): # last_candle = dataframe.iloc[i] # if last_candle['enter_long'] is not None and last_candle['enter_long'] == 1: # futur_candle = dataframe.iloc[i + 48] # val_1 = last_candle[key_1] # val_2 = last_candle[key_2] # if not np.isnan(val_1) and not np.isnan(val_2): # value = 100 * futur_candle[futur] # list_1.append(val_2) # list_2.append(val_1) # data.append(value) # i += 48 # skip to avoid overlapping trades # # # Tes données sous forme de listes # x = np.array(list_1) # axe X # y = np.array(list_2) # axe Y # z = np.array(data) # valeur à afficher (performance future) # # print(len(list_2), len(list_2), len(data)) # # print(f"Min/max H1: {min(list_1):.5f}, {max(list_1):.5f}") # # print(f"Min/max 1D: {min(list_2):.5f}, {max(list_2):.5f}") # # print(f"Min/max Data: {min(data):.5f}, {max(data):.5f}") # # Fusionner X et Y comme variables indépendantes # XY = np.column_stack((x, y)) # # Modèle # model = LinearRegression() # model.fit(XY, z) # # Coefficients # a, b = model.coef_ # c = model.intercept_ # r_squared = model.score(XY, z) # print(f"Coefficient de détermination R² : {r_squared:.4f}") # print(f"Équation estimée : Z = {a:.4f} * X + {b:.4f} * Y + {c:.4f}") # degree = 2 # Pour inclure X², Y², XY # poly_model = make_pipeline(PolynomialFeatures(degree), LinearRegression()) # poly_model.fit(XY, z) # # # Pour afficher les coefficients : # linreg = poly_model.named_steps['linearregression'] # print("Coefficients:", linreg.coef_) # print("Intercept:", linreg.intercept_) # # # # Données factices # # x = np.random.uniform(-2, 2, 500) # # y = np.random.uniform(-2, 2, 500) # # z = np.sin(x) * np.cos(y) * 10 # variation factice # # # Discrétisation (binning) # xbins = np.linspace(min(x), max(x), 20) # ybins = np.linspace(min(y), max(y), 20) # # # Création des bins 2D # H, xedges, yedges = np.histogram2d(x, y, bins=[xbins, ybins], weights=z) # counts, _, _ = np.histogram2d(x, y, bins=[xbins, ybins]) # pour normaliser # # # Moyenne dans chaque bin (évite division par 0) # H_avg = np.divide(H, counts, out=np.zeros_like(H), where=counts != 0) # # # Préparer coordonnées pour le graphique # xpos, ypos = np.meshgrid(xedges[:-1], yedges[:-1], indexing="ij") # xpos = xpos.ravel() # ypos = ypos.ravel() # zpos = np.zeros_like(xpos) # # dx = dy = (xedges[1] - xedges[0]) * 0.9 # dz = H_avg.ravel() # # # Affichage # fig = plt.figure(figsize=(12, 8)) # ax = fig.add_subplot(111, projection='3d') # colors = plt.cm.RdYlGn((dz - dz.min()) / (dz.max() - dz.min() + 1e-5)) # Normalisation # # ax.bar3d(xpos, ypos, zpos, dx, dy, dz, color=colors, shade=True) # # ax.set_xlabel(f"{key_1}") # ax.set_ylabel(f"{key_2}") # ax.set_zlabel('Perf. moyenne sur 48 bougies') # ax.set_title('Performance 48 bougies (%)') # plt.show() # plt.figure(figsize=(10, 8)) # scatter = plt.scatter( # list_1, # list_2, # c=data, # La couleur selon la performance future # cmap='RdYlGn', # Dégradé rouge -> jaune -> vert # alpha=0.8, # edgecolors='k' # ) # plt.xlabel(f"{key_1}") # plt.ylabel(f"{key_2}") # plt.title(f"Performance future") # plt.colorbar(scatter, label="Performance 48 bougies (%)") # plt.grid(True) # plt.show() # plt.figure(figsize=(10, 6)) # plt.scatter(list_1, data, c='blue', alpha=0.6) # plt.xlabel("SMA5 % sur 1 jour") # plt.ylabel("Variation du prix après 48 bougies (%)") # plt.title("Lien entre variation SMA5 1j et performance 48h") # plt.grid(True) # plt.show() return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: #print("has open orders : true") return None if (self.wallets.get_available_stake_amount() < 50): # or trade.stake_amount >= max_stake: #print("wallet too low") return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() # prépare les données count_of_buys = trade.nr_of_successful_entries current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 if (len(dataframe) < 1): #print("dataframe empty") return None pair = trade.pair if pair not in ('BTC/USDC', 'XRP/USDC', 'BTC/USDT', 'XRP/USDT'): print(f"{pair} not in allowed pairs list") return None max_buys = 20 # filled_buys = trade.select_filled_orders('buy') # count_of_buys = len(filled_buys) if count_of_buys >= max_buys: #print(f"count_of_buys {count_of_buys} > {max_buys} max buys") return None # if 'buy' in last_candle: # condition = (last_candle['buy'] == 1) # else: # condition = False # self.protection_nb_buy_lost.value # limit = last_candle['limit'] stake_amount = self.config['stake_amount'] + 50 * self.fibo[count_of_buys] # current_time_utc = current_time.astimezone(timezone.utc) # open_date = trade.open_date.astimezone(timezone.utc) # days_since_open = (current_time_utc - open_date).days pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4) # if (days_since_open > count_of_buys) & (0 < count_of_buys <= max_buys) & (current_rate <= limit) & (last_candle['enter_long'] == 1): if ( ( last_candle['enter_long'] == 1) or (last_candle['percent48'] < - 0.03 and last_candle['rsi_diff_1h'] > -5) ) \ and (pct_max < -0.012 - (count_of_buys * 0.001)): try: # This then calculates current safety order size # stake_amount = stake_amount * pow(1.5, count_of_buys) # print( # f"Adjust {current_time} price={trade.pair} rate={current_rate:.4f} buys={count_of_buys} limit={limit:.4f} stake={stake_amount:.4f}") trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.log_trade( last_candle=last_candle, date=current_time, action="Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount except Exception as exception: print(exception) return None pcte=-0.012 - (count_of_buys * 0.001) if not self.dp.runmode.value in ('backtest', 'hyperopt'): logger.error(f"adjust_trade_position {trade.pair} tag={last_candle['enter_long']} pct48={last_candle['percent48']:.1f} pctmax={pct_max:.4f} pcte={pcte:.4f}") return None def adjust_stake_amount(self, pair: str, dataframe: DataFrame): # Calculer le minimum des 14 derniers jours current_price = dataframe['close'] # trade = self.getTrade(pair) # if trade: # current_price = trade.open_rate base_stake_amount = self.config['stake_amount'] #.get('stake_amount', 50) # Montant de base configuré # Calculer le max des 14 derniers jours min_14_days_4 = dataframe['lowest_4_1d'] max_14_days_4 = dataframe['highest_4_1d'] percent_4 = 1 - (current_price - min_14_days_4) / (max_14_days_4 - min_14_days_4) factor_4 = 1 / ((current_price - min_14_days_4) / (max_14_days_4 - min_14_days_4)) max_min_4 = max_14_days_4 / min_14_days_4 # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # percent = 1 - (current_price - min_14_days) / (max_14_days - min_14_days) # factor = 1 / ((current_price - min_14_days) / (max_14_days - min_14_days)) # max_min = max_14_days / min_14_days # Stack amount ajusté price=2473.47 min_max=0.15058074985054215 percent=0.8379141364642171 amount=20.0 adjusted_stake_amount = max(base_stake_amount, min(100, base_stake_amount * percent_4)) # if pair in ('BTC/USDT', 'ETH/USDT'): # if percent_4 > 0.5: # adjusted_stake_amount = 300 # adjusted_stake_amount_2 = max(base_stake_amount / 2.5, min(75, base_stake_amount * percent)) # print( # f"Stack amount ajusté price={current_price} max_min={max_min_4:.4f} min_14={min_14_days_4:.4f} max_14={max_14_days_4:.4f} factor={factor_4:.4f} percent={percent_4:.4f} amount={adjusted_stake_amount:.4f}") # print(f"Stack amount ajusté price={current_price} max_min={max_min:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} factor={factor:.4f} percent={percent:.4f} amount={adjusted_stake_amount_2:.4f}") return adjusted_stake_amount # def adjust_exit_price(self, dataframe: DataFrame): # # Calculer le max des 14 derniers jours # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # entry_price = dataframe['fbp'] # current_price = dataframe['close'] # percent = 0.5 * (max_14_days - min_14_days) / min_14_days # exit_price = (1 + percent) * entry_price # # print(f"Exit price ajusté price={current_price:.4f} max_14={max_14_days:.4f} exit_price={exit_price:.4f}") # # return exit_price # def adjust_stoploss(self, pair: str, trade: 'Trade', current_time: datetime, # current_rate: float, current_profit: float, **kwargs) -> float: # dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) # # print(dataframe) # last_candle = dataframe.iloc[-1].squeeze() # # # Utiliser l'ATR pour ajuster le stoploss # atr_stoploss = current_rate - (last_candle['atr'] * 1.5) # Stoploss à 1.5x l'ATR # # # Retourner le stoploss dynamique en pourcentage du prix actuel # return (atr_stoploss / current_rate) - 1 def expectedProfit(self, pair: str, last_candle): current_price = last_candle['last_price'] # dataframe['close'] # trade = self.getTrade(pair) # if trade: # current_price = trade.open_rate # Calculer le max des 14 derniers jours min_14_days = last_candle['lowest_1d'] max_14_days = last_candle['highest_1d'] percent = (max_14_days - current_price) / (min_14_days) min_max = last_candle['pct_min_max_1d'] # (max_14_days - min_14_days) / min_14_days expected_profit = min(0.1, max(0.01, last_candle['min_max200'] * 0.5 + self.pairs[pair]['count_of_buys'] * 0.0005)) return expected_profit # def adjust_exit_price(self, dataframe: DataFrame): # # Calculer le max des 14 derniers jours # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # entry_price = dataframe['fbp'] # current_price = dataframe['close'] # percent = 0.5 * (max_14_days - min_14_days) / min_14_days # exit_price = (1 + percent) * entry_price # # print(f"Exit price ajusté price={current_price} max_14={max_14_days} exit_price={exit_price}") # # return exit_price # def adjust_entry_price(self, dataframe: DataFrame): # # Calculer le max des 14 derniers jours # min_14_days = dataframe['lowest_1d'] # max_14_days = dataframe['highest_1d'] # current_price = dataframe['close'] # percent = 0.5 * (max_14_days - min_14_days) / min_14_days # entry_price = (1 + percent) * entry_price # # print(f"Entry price ajusté price={current_price} max_14={max_14_days} exit_price={entry_price}") # # return entry_price # def adjust_stake_amount(self, dataframe: DataFrame): # # Calculer le minimum des 14 derniers jours # middle = dataframe['middle_1d'] # # # Récupérer la dernière cotation actuelle (peut être le dernier point de la série) # current_price = dataframe['close'] # # # Calculer l'écart entre la cotation actuelle et le minimum des 14 derniers jours # difference = middle - current_price # # Ajuster la stake_amount en fonction de l'écart # # Par exemple, augmenter la stake_amount proportionnellement à l'écart # base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré # # multiplier = 1 - (difference / current_price) # Exemple de logique d'ajustement # # adjusted_stake_amount = max(base_stake_amount / 2.5, base_stake_amount * multiplier) # # # difference = 346.07000000000016 # # price = 2641.75 # # min_14 = 2295.68 # # amount = 56.5500141951358 # # print(f"Stack amount ajusté difference={difference} price={current_price} middle={middle} multiplier={multiplier} amount={adjusted_stake_amount}") # # return adjusted_stake_amount def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values