# pr#agma pylint: disable=missing-docstring, invalid-name, pointless-string-statement # isort: skip_file # --- Do not remove these libs --- from datetime import datetime from freqtrade.persistence import Trade import numpy as np # noqa import pandas as pd # noqa from pandas import DataFrame from datetime import timezone, timedelta from datetime import timedelta, datetime from freqtrade.strategy.interface import IStrategy from freqtrade.persistence import Trade from typing import Optional, Union, Tuple from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) # -------------------------------- # Add your lib to import here import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib from functools import reduce from random import shuffle import logging logger = logging.getLogger(__name__) timeperiods = [3, 5, 12, 24, 36, 48, 60] # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" timeperiods = [3, 5, 12, 24, 36, 48, 60] long_timeperiods = [80, 100, 120, 140, 160, 180, 200] sma_indicators = list() sma_indicators_h = list() sma_indicators_d = list() score_indicators = list() stop_buying_indicators = list() sma_deriv1_indicators = list() for timeperiod in timeperiods: sma_indicators.append(f"sma{timeperiod}") sma_indicators_h.append(f"sma{timeperiod}") sma_indicators_d.append(f"sma{timeperiod}") # sma_indicators.append(f"sma{timeperiod}_1h") # god_genes_with_timeperiod.append(f'max{timeperiod}') # god_genes_with_timeperiod.append(f'min{timeperiod}') # god_genes_with_timeperiod.append(f"percent{timeperiod}") # god_genes_with_timeperiod.append(f"sma{timeperiod}") # sma_deriv1_indicators.append(f"sma{timeperiod}_deriv1") sma_deriv1_indicators.append(f"sma{timeperiod}_deriv1") # sma_deriv1_indicators.append(f"sma{timeperiod}_deriv2") # sma_deriv1_indicators.append(f"sma{timeperiod}_score") # stoploss_indicators.append(f"stop_buying{timeperiod}") stop_buying_indicators.append(f"stop_buying{timeperiod}_1h") score_indicators.append(f"sma{timeperiod}_score") # score_indicators.append(f"sma{timeperiod}_score_1h") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_down") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_up") # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_down") for timeperiod in long_timeperiods: sma_deriv1_indicators.append(f"sma{timeperiod}_deriv1_1h") sma_indicators_d.append(f"sma{timeperiod}") # number of candles to check up,don,off trend. TREND_CHECK_CANDLES = 4 DECIMALS = 1 def normalize(df): df = (df-df.min())/(df.max()-df.min()) return df # This class is a sample. Feel free to customize it. class Empty5m(IStrategy): # Strategy interface version - allow new iterations of the strategy interface. # Check the documentation or the Sample strategy to get the latest version. INTERFACE_VERSION = 2 # ROI table: minimal_roi = { #"0": 0.015 "0": 0.5 } # Stoploss: stoploss = -1 trailing_stop = False # trailing_stop_positive = 0.15 # trailing_stop_positive_offset = 0.20 # trailing_only_offset_is_reached = False position_adjustment_enable = True use_custom_stoploss = True #max_open_trades = 3 # Optimal ticker interval for the strategy. timeframe = '1m' # Run "populate_indicators()" only for new candle. process_only_new_candles = False # These values can be overridden in the "ask_strategy" section in the config. use_sell_signal = True sell_profit_only = False ignore_roi_if_buy_signal = False # Number of candles the strategy requires before producing valid signals startup_candle_count: int = 30 columns_logged = False pairs = { pair: { 'first_amount': 0, "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'last_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False, 'current_trade': None, 'last_trade': None, 'force_stop': False, 'count_of_lost': 0 } for pair in ["BTC/USDC", "BTC/USDT"] } plot_config = { "main_plot": { "sma5": { "color": "#7aa90b" }, "min24": { "color": "#121acd" } }, "subplots": { "Inv": { "sma5_inv": { "color": "#aef878", "type": "line" }, "zero": { "color": "#fdba52" }, "sma24_score": { "color": "#f1f5b0" } }, "drv": { "sma5_deriv1": { "color": "#96eebb" } } }, "options": { "markAreaZIndex": 1 } } # start_bull_indicator = CategoricalParameter(sma_indicators_d, default='sma100', space='buy', optimize=True, load=True) # start_bull_deriv1 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True) # start_bull_deriv2 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True) # start_bear_indicator = CategoricalParameter(sma_indicators_d, default='sma100', space='buy', optimize=True, load=True) # start_bear_deriv1 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True) # start_bear_deriv2 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True) # buy_deriv1_sma60 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True) # buy_deriv1_sma5d = DecimalParameter(-0.007, 0.007, decimals=4, default=0, space='buy', optimize=True, load=True) # buy_deriv1_sma12d = DecimalParameter(-0.007, 0.007, decimals=4, default=0, space='buy', optimize=True, load=True) # buy_deriv2_sma60 = DecimalParameter(-0.0005, 0.0005, decimals=4, default=0, space='buy', optimize=True, load=True) # buy_deriv2_sma5d = DecimalParameter(-0.0007, 0.0007, decimals=4, default=0, space='buy', optimize=True, load=True) # buy_deriv2_sma12d = DecimalParameter(-0.0007, 0.0007, decimals=4, default=0, space='buy', optimize=True, load=True) # buy_longue = CategoricalParameter(long_timeperiods, default=120, space='buy', optimize=True, load=True) # buy_longue_derive = CategoricalParameter(sma_deriv1_indicators, default="sma60_deriv1_1h", space='buy', optimize=False, load=True) # sell_score_indicator = CategoricalParameter(score_indicators, default="sma24_score", space='sell') # sell_sma_indicators = CategoricalParameter(sma_indicators, default="sma24_score", space='sell') # sell_crossed_sma_indicators = CategoricalParameter(sma_indicators, default="sma24_score", space='sell') drop_from_last_entry = DecimalParameter(-0.1, 0, decimals=2, default=-0.025, space='protection', optimize=True, load=True) # baisses = list() # for i in range(0, 0.5, 0.05): # baisses.append(i) # mises_bear = IntParameter(1, 10, default=1, space='protection') mises_bull = IntParameter(1, 10, default=1, space='protection') sell_force_sell = DecimalParameter(-0.2, 0, decimals=3, default=-0.02, space='sell') sell_indicator = CategoricalParameter(sma_indicators, default="sma36", space='sell', optimize=True, load=True) baisse = DecimalParameter(0.1, 0.5, decimals=2, default=0.3, space='sell', optimize=True, load=True) b30_indicateur = CategoricalParameter(sma_indicators, default="sma36", space='sell', optimize=True, load=True) # lost_indicator = CategoricalParameter(sma_deriv1_indicators, default="sma5_deriv1", space='protection') # range_pos_stoploss = DecimalParameter(0, 0.1, decimals=2, default=0.05, space='protection') # stoploss_force = DecimalParameter(-0.2, 0, decimals=2, default=-0.05, space='protection') # stoploss_timeperiod = CategoricalParameter(timeperiods, default="12", space='protection') def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() return self.adjust_stake_amount(pair, last_candle) def adjust_stake_amount(self, pair: str, last_candle: DataFrame): if (self.pairs[pair]['first_amount'] > 0): amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) else: # range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1h"] # range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1h"] # # if range_max == range_min: # return -0.1 # sécurité # # range_pos = (last_candle['close'] - range_min) / (range_max - range_min) # range_pos = last_candle[f"range_pos"] # sl_min = self.wallets.get_available_stake_amount() / 6 # sl_max = self.wallets.get_available_stake_amount() / 2 # # amount = sl_min + (1 - range_pos) * (sl_max - sl_min) # if last_candle['enter_tag'] in ['fall', 'bear', 'Force', 'Range-']: # amount = self.wallets.get_available_stake_amount() / self.mises_bear.value # else: amount = self.wallets.get_available_stake_amount() / self.mises_bull.value # / (2 * self.pairs[pair]['count_of_lost'] + 1) # factor = 1 # # if self.pairs[pair]['last_trade'] is not None \ # and self.pairs[pair]['last_trade'].close_profit is not None \ # and self.pairs[pair]['last_trade'].close_profit > 0.01: # factor = self.pairs[pair]['last_trade'].close_profit / 0.01 # amount = amount / factor return min(amount, self.wallets.get_available_stake_amount()) def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake: return 0 # dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() last_candle_previous_1h = dataframe.iloc[-13].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) # last_lost = self.getLastLost(last_candle, pair) # pct_first = 0 stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle)) # if (last_candle['enter_long'] == 1 and current_profit < -0.05 and stake_amount > 10) : # # print(f"adjust {current_time} {stake_amount}") # print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") # return stake_amount if last_candle['enter_long'] != 1: return None filled_buys = [ o for o in trade.orders if o.status == "closed" and o.ft_order_side == "buy" ] if not filled_buys: return None last_buy = max(filled_buys, key=lambda o: o.order_date) last_entry_price = last_buy.price if last_entry_price: drop_from_last_entry = (current_rate - last_entry_price) / last_entry_price if drop_from_last_entry <= self.drop_from_last_entry.value and last_candle['min60'] == last_candle_3['min60'] \ and last_candle['range_pos'] < 0.03 and last_candle['hapercent'] > 0: # stake_amount = trade.stake_amount self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['total_amount'] += stake_amount # print(f"adjust {pair} drop={drop_from_last_entry} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") # print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}") trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=round(dispo,0), pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() if entry_tag == 'Range-': self.pairs[pair]['count_of_lost'] = 0 if entry_tag == 'Force': if self.pairs[pair]['count_of_lost'] >= 1: self.pairs[pair]['count_of_lost'] = 0 condition = False else: condition = False if self.pairs[pair]['count_of_lost'] >= 1 \ and (last_candle['sma12_deriv1_1h'] < 0.00) \ and entry_tag != 'dist' else True reason = '' if not condition: reason = 'lost' if condition: if last_candle['range_pos'] > 0.05:# and self.pairs[pair]['force_stop']: condition = last_candle['sma5'] > last_candle['sma60'] if not condition: reason = 'range' # if self.pairs[pair]['force_stop'] and last_candle['range_pos'] < 0.02: # self.pairs[pair]['force_stop'] = False if False and self.pairs[pair]['last_trade'].close_date is not None: # base cooldown = n bougies / cooldown proportionnel au profit # bougies de plus par % cooldown_candles = 12 + 6 * (int(self.pairs[pair]['last_profit'] / 0.01)) # réglable # temps depuis la fermeture candles_since_close = ((current_time - self.pairs[pair]['last_trade'].close_date).total_seconds() / 3600) # seconds / heure condition = (candles_since_close >= cooldown_candles) print(f"Cool close date = trade={self.pairs[pair]['last_trade'].close_date} down {round(self.pairs[pair]['last_profit'], 3)} {cooldown_candles} {candles_since_close}") # self.should_enter_trade(pair, last_candle, current_time) if self.pairs[pair]['stop']: reason = 'stop' allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') # force = self.pairs[pair]['force_buy'] # if self.pairs[pair]['force_buy']: # self.pairs[pair]['force_buy'] = False # allow_to_buy = True # else: # if not self.should_enter_trade(pair, last_candle, current_time): # allow_to_buy = False dispo = round(self.wallets.get_available_stake_amount()) if allow_to_buy: self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_profit'] = 0 self.pairs[pair]['last_trade'] = None self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['last_date'] = current_time # self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['first_amount'] = stake_amount self.pairs[pair]['total_amount'] = stake_amount # print(f"Buy {pair} {current_time} {entry_tag} dispo={dispo} amount={stake_amount} rate={rate} rate={rate}") self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) else: self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + reason, pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=0 ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() profit = trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'god') or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') or (exit_reason == 'trailing_stop_loss') minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle profit = trade.calc_profit(rate) self.pairs[pair]['previous_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} rate={rate} open_rate={trade.open_rate} profit={profit}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['first_amount'] = 0 self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['last_trade'] = trade self.pairs[pair]['current_trade'] = None self.pairs[pair]['max_profit'] = 0 if profit < 0: self.pairs[pair]['count_of_lost'] += 1 else: self.pairs[pair]['count_of_lost'] = 0 else: print(f"{current_time} STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1] before_last_candle = dataframe.iloc[-2] self.pairs[pair]['current_trade'] = trade # momentum = last_candle[self.sell_score_indicator.value] profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) count_of_buys = trade.nr_of_successful_entries expected_profit = self.expectedProfit(pair, last_candle) self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) max_profit = self.pairs[pair]['max_profit'] # baisse_min = last_candle['close'] - last_candle['min12'] baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount() + self.pairs[pair]['total_amount']) # self.log_trade( # last_candle=last_candle, # date=current_time, # action=("🔴 NOW" if self.pairs[pair]['stop'] else "🟢 NOW "), # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='momentum' + str(round(momentum, 2)), # profit=round(profit, 2), # buys=count_of_buys, # stake=0 # ) if self.pairs[pair]['current_trade'].enter_tag in ['bear', 'Force', 'Range-']: if current_profit < - 0.02 and last_candle[f"close"] <= last_candle['sma60'] and self.wallets.get_available_stake_amount() < 50: self.pairs[pair]['force_sell'] = True return 'smaBF' else: if current_profit < self.sell_force_sell.value \ and last_candle[f"close"] <= last_candle[self.sell_indicator.value]: self.pairs[pair]['force_sell'] = True return 'sma' if current_profit > 0.00 and \ (baisse > self.baisse.value and last_candle[f"close"] <= last_candle[self.b30_indicateur.value]) \ and last_candle['hapercent'] <0 : self.pairs[pair]['force_sell'] = True return 'B30' # if profit > 0 and baisse > 0.5 and last_candle['hapercent'] <0 and last_candle[f"close"] <= last_candle['sma12']: # self.pairs[pair]['force_sell'] = True # return 'B50' if current_profit > - self.sell_force_sell.value and last_candle['has_cross_sma3_1h'] == 1: self.pairs[pair]['force_sell'] = True return 'Cross' # if profit > max(5, expected_profit) and last_candle['sma5_deriv1_1h'] < 0 and baisse > 0.15: # self.pairs[pair]['force_sell'] = True # return 'Drv5d' # if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 : #last_candle['cross_sma60']: # self.pairs[pair]['force_sell'] = True # return 'Range' # if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 \ # and last_candle['sma5_deriv1_1h'] < 0 and last_candle['sma5_deriv2_1h'] < 0 \ # and last_candle['sma60_deriv1'] < 0 and last_candle['sma60_deriv2'] < 0: # self.pairs[pair]['force_sell'] = True # self.pairs[pair]['force_stop'] = True # return 'Deriv'; if profit < - (dispo * 0.2): print(f'dispo={dispo} wallet={round(self.wallets.get_available_stake_amount())} limit={-dispo * 0.1}') self.pairs[pair]['force_sell'] = True self.pairs[pair]['force_stop'] = True return 'Force'; # if self.pairs[pair]['force_sell'] and self.pairs[pair]['last_trade'].close_profit > 0.02: # return "Force" # if last_candle['stop_buying']: # self.pairs[pair]['force_sell'] = True # return 'Stop' # Si momentum fort → on laisse courir # if momentum > 1: # return None # Si momentum faiblit → on prend profit plus tôt # if momentum < 0 and current_profit > 0.02 and last_candle[self.sell_score_indicator.value] < before_last_candle[self.sell_score_indicator.value]\ # and last_candle['close'] < last_candle['sma5']: # self.pairs[pair]['last_profit'] = current_profit # return "momentum_drop" return None def custom_stoploss(self, pair, trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1] profit = trade.calc_profit(current_rate) candle_at_buy = self.pairs[pair]['last_candle'] # if candle_at_buy['range_pos'] > self.range_pos_stoploss.value and candle_at_buy[self.stoploss_indicator.value] < 0: # return self.stoploss_force.value # # print(f'test stop loss {self.stoploss} {last_candle["stop_buying12_1h"]}') # if last_candle[self.stoploss_indicator.value] and (trade.nr_of_successful_entries >= 4 or self.wallets.get_available_stake_amount() < 300): # and profit < - 30 : # range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1h"] # range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1h"] # # if range_max == range_min: # print(f'ranges={range_min}') # return -0.1 # sécurité # # range_pos = (current_rate - range_min) / (range_max - range_min) # # if (range_pos > 1): # return -1 # # sl_min = -0.02 # sl_max = -0.1 #self.stoploss # # dynamic_sl = (sl_min + (1 - range_pos) * (sl_max - sl_min)) # # print(f'{current_time} Loss ranges={round(range_min,0)} {round(range_max, 0)} range_pos={round(range_pos, 3)} dynamic_sl={round(dynamic_sl, 3)} ' # f'profit={profit} current={current_profit} {self.stoploss_indicator.value} {self.stoploss_timeperiod.value} {last_candle[self.stoploss_indicator.value]}') # # return dynamic_sl return -1 def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1h') for pair in pairs] informative_pairs += [(pair, '1d') for pair in pairs] return informative_pairs def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe['zero'] = 0 dataframe[f"percent"] = dataframe['close'].pct_change() for timeperiod in timeperiods: dataframe[f"mid_smooth{timeperiod}"] = dataframe['mid'].rolling(timeperiod).mean() dataframe[f'max{timeperiod}'] = talib.MAX(dataframe['close'], timeperiod=timeperiod) dataframe[f'min{timeperiod}'] = talib.MIN(dataframe['close'], timeperiod=timeperiod) dataframe[f"percent{timeperiod}"] = dataframe['close'].pct_change(timeperiod) dataframe[f"sma{timeperiod}"] = dataframe['mid'].ewm(span=timeperiod, adjust=False).mean() # dataframe[f"high{timeperiod}"] = dataframe['high'].ewm(span=timeperiod, adjust=False).mean() # dataframe[f"low{timeperiod}"] = dataframe['low'].ewm(span=timeperiod, adjust=False).mean() # dataframe = self.calculateRegression(dataframe, column=f"high{timeperiod}", window=10, degree=1, future_offset=12) # dataframe = self.calculateRegression(dataframe, column=f"low{timeperiod}", window=10, degree=1, future_offset=12) self.calculeDerivees(dataframe, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) dataframe = self.calculateRegression(dataframe, column='mid', window=30, degree=1, future_offset=12) dataframe = self.calculateRegression(dataframe, column='sma24', window=30, degree=1, future_offset=12) dataframe["percent"] = dataframe["mid"].pct_change(1) dataframe["percent3"] = dataframe["mid"].pct_change(3) dataframe["volume_mean"] = dataframe["volume"].rolling(20).mean() dataframe["volume_ratio"] = dataframe["volume"] / dataframe["volume_mean"] dataframe["market_state"] = 0 dataframe.loc[dataframe["percent"] < -0.005, "market_state"] = -1 dataframe.loc[(dataframe["percent3"] < -0.015) & (dataframe["volume_ratio"] > 2), "market_state"] = -2 dataframe.loc[(dataframe["percent"] > 0.003) & (dataframe["volume_ratio"] > 1.5), "market_state"] = 1 dataframe["velocity"] = dataframe["percent"] - dataframe["percent3"] # ###################################################################################################### ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") # informative = self.populateDataframe(informative, timeframe='1d') heikinashi = qtpylib.heikinashi(informative) informative['haopen'] = heikinashi['open'] informative['haclose'] = heikinashi['close'] informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose'] informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 for timeperiod in timeperiods: informative[f'max{timeperiod}'] = talib.MAX(informative['close'], timeperiod=timeperiod) informative[f'min{timeperiod}'] = talib.MIN(informative['close'], timeperiod=timeperiod) # informative[f"range{timeperiod}"] = ((informative["close"] - informative[f'min{timeperiod}']) / (informative[f'max{timeperiod}'] - informative[f'min{timeperiod}'])) # informative[f"percent{timeperiod}"] = informative['close'].pct_change(timeperiod) informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) informative = self.calculateRegression(informative, column='mid', window=10, degree=1, future_offset=12) informative = self.calculateRegression(informative, column='sma3', window=10, degree=1, future_offset=12) informative = self.calculateRegression(informative, column='low', window=10, degree=1, future_offset=12) for timeperiod in long_timeperiods: informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) informative['rsi'] = talib.RSI(informative['close'], timeperiod=14) self.calculeDerivees(informative, f"rsi", timeframe=self.timeframe, ema_period=14) informative['max_rsi_12'] = talib.MAX(informative['rsi'], timeperiod=12) informative['max_rsi_24'] = talib.MAX(informative['rsi'], timeperiod=24) informative[f'stop_buying_deb'] = qtpylib.crossed_below(informative[f"sma12"], informative['sma36']) & (informative['close'] < informative['sma100']) informative[f'stop_buying_end'] = qtpylib.crossed_above(informative[f"sma12"], informative['sma36']) & (informative['close'] > informative['sma100']) latched = np.zeros(len(informative), dtype=bool) for i in range(1, len(informative)): if informative['stop_buying_deb'].iloc[i]: latched[i] = True elif informative['stop_buying_end'].iloc[i]: latched[i] = False else: latched[i] = latched[i - 1] informative['stop_buying'] = latched informative = self.calculateDownAndUp(informative, limit=0.0001) dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) # ###################################################################################################### # ###################################################################################################### ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") # informative = self.populateDataframe(informative, timeframe='1d') # heikinashi = qtpylib.heikinashi(informative) # informative['haopen'] = heikinashi['open'] # informative['haclose'] = heikinashi['close'] # informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose'] informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2 for timeperiod in timeperiods: informative[f'max{timeperiod}'] = talib.MAX(informative['close'], timeperiod=timeperiod) informative[f'min{timeperiod}'] = talib.MIN(informative['close'], timeperiod=timeperiod) # informative[f"range{timeperiod}"] = ((informative["close"] - informative[f'min{timeperiod}']) / (informative[f'max{timeperiod}'] - informative[f'min{timeperiod}'])) # informative[f"percent{timeperiod}"] = informative['close'].pct_change(timeperiod) informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) for timeperiod in long_timeperiods: informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean() self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod) informative['rsi'] = talib.RSI(informative['close'], timeperiod=14) self.calculeDerivees(informative, f"rsi", timeframe=self.timeframe, ema_period=14) informative['max_rsi_12'] = talib.MAX(informative['rsi'], timeperiod=12) informative['max_rsi_24'] = talib.MAX(informative['rsi'], timeperiod=24) informative[f'stop_buying_deb'] = qtpylib.crossed_below(informative[f"sma12"], informative['sma36']) & (informative['close'] < informative['sma100']) informative[f'stop_buying_end'] = qtpylib.crossed_above(informative[f"sma12"], informative['sma36']) & (informative['close'] > informative['sma100']) latched = np.zeros(len(informative), dtype=bool) for i in range(1, len(informative)): if informative['stop_buying_deb'].iloc[i]: latched[i] = True elif informative['stop_buying_end'].iloc[i]: latched[i] = False else: latched[i] = latched[i - 1] informative['stop_buying'] = latched dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) # ###################################################################################################### range_min = dataframe[f"min12_1h"] range_max = dataframe[f"max48"] dataframe[f"range_pos"] = ((dataframe['mid'] - range_min) / (range_max)).rolling(5).mean() # dataframe['cross_sma60'] = qtpylib.crossed_below(dataframe[self.sell_sma_indicators.value], dataframe[self.sell_crossed_sma_indicators.value]) dataframe[f'has_cross_sma3_1h'] = qtpylib.crossed_above(dataframe[f"sma60"], dataframe['sma3_regression_1h']) dataframe[f'has_cross_min'] = qtpylib.crossed_above(dataframe[f"close"], dataframe['min60']) dataframe[f'has_cross_min_6'] = (dataframe['has_cross_min'].rolling(15).max() == 1) dataframe['atr'] = talib.ATR(dataframe) dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) dataframe["dist_sma200_1h"] = ( (dataframe["close_1h"] - dataframe["sma200_1h"]) / dataframe["sma200_1h"] ) # Compter les baisses / hausses consécutives dataframe = self.calculateDownAndUp(dataframe, limit=0.0001) # récupérer le dernier trade fermé trades = Trade.get_trades_proxy(pair=pair,is_open=False) if trades: last_trade = trades[-1] self.pairs[pair]['last_profit'] = last_trade.close_profit # ex: 0.12 = +12% self.pairs[pair]['last_trade'] = last_trade return dataframe def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: conditions = list() # ##################################################################################### # CA MONTE !! # ##################################################################################### # conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv1_1h" ] > self.start_bull_deriv1.value) # conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv2_1h"] > self.start_bull_deriv2.value) # conditions.append(dataframe['sma12_deriv1'] > self.buy_deriv1_sma60.value) # conditions.append(dataframe['sma5_deriv1_1h'] > self.buy_deriv1_sma5d.value) # conditions.append(dataframe['sma12_deriv1_1h'] > self.buy_deriv1_sma12d.value) # conditions.append(dataframe['sma36_deriv2'] > self.buy_deriv2_sma60.value) # conditions.append(dataframe['sma5_deriv2_1h'] > self.buy_deriv2_sma5d.value) # conditions.append(dataframe['sma12_deriv2_1h'] > self.buy_deriv2_sma12d.value) conditions.append(dataframe['hapercent'] > 0) # conditions.append(dataframe['percent12'] < 0.01) # conditions.append(dataframe['percent5'] < 0.01) # conditions.append(dataframe['max_rsi_24'] < 80) # dynamic_rsi_threshold = 70 + 15 * np.tanh(dataframe["dist_sma200_1h"] * 5) # conditions.append((dataframe['max_rsi_12_1h'] < dynamic_rsi_threshold)) # conditions.append(dataframe[f"close"] > dataframe['sma60']) # conditions.append(((dataframe[f"range_pos"] < 0.05) ) | ((dataframe['sma12_deriv1'] > 0) & (dataframe['sma12_deriv2'] > 0))) # # conditions.append( # (dataframe['close_1h'] > dataframe[f'sma{self.buy_longue.value}_1h']) # | (dataframe['sma60_inv_1h'] == -1) # ) # if conditions: # dataframe.loc[ # reduce(lambda x, y: x & y, conditions), # ['enter_long', 'enter_tag'] # ] = (1, 'bull') # ##################################################################################### # conditions = list() # conditions.append(dataframe['dist_sma200_1h'] < -0.05) # conditions.append(dataframe['sma12_inv'] == 1) # # conditions.append(dataframe['sma100_deriv1_1h'] > 0) # if conditions: # dataframe.loc[ # reduce(lambda x, y: x & y, conditions), # ['enter_long', 'enter_tag'] # ] = (1, 'dist') # ##################################################################################### # conditions = list() # conditions.append(dataframe['close'] < dataframe['sma100_1h']) # conditions.append(dataframe['mid_smooth12'] > dataframe['mid_smooth12'].shift(1)) # conditions.append(dataframe['sma100_deriv1_1h'] > 0) # conditions.append(dataframe[f"range_pos"] < 0.01) # if conditions: # dataframe.loc[ # reduce(lambda x, y: x & y, conditions), # ['enter_long', 'enter_tag'] # ] = (1, 'dist') # ##################################################################################### # CA BAISSE !! # "buy": { # "buy_deriv1_sma12d": 0.0, # "buy_deriv1_sma5d": 0.0, # "buy_deriv1_sma60": 0.003, # "buy_deriv2_sma12d": -0.03, # "buy_deriv2_sma5d": 0.0, # "buy_deriv2_sma60": -0.002, # "buy_longue": 200, # "buy_longue_derive": "sma160_deriv1_1h" # }, # "protection": { # "drop_from_last_entry": -0.03, # "b30_indicateur": "sma3", # "baisse": 0.31 # }, # ##################################################################################### conditions = list() # conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv1_1h" ] < self.start_bull_deriv1.value) # conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv2_1h"] < self.start_bull_deriv2.value) # conditions.append(dataframe[f"{self.start_bear_indicator.value}_deriv1_1h" ] > self.start_bear_deriv1.value) # conditions.append(dataframe[f"{self.start_bear_indicator.value}_deriv2_1h"] > self.start_bear_deriv2.value) # conditions.append(dataframe['sma12_deriv1'] > self.buy_deriv1_sma60.value) # conditions.append(dataframe['sma5_deriv1_1h'] > self.buy_deriv1_sma5d.value) # conditions.append(dataframe['sma12_deriv1_1h'] > self.buy_deriv1_sma12d.value) # # # conditions.append(dataframe['sma12_deriv2'] > -0.002) # # conditions.append(dataframe['sma5_deriv2_1h'] > 0) # # conditions.append(dataframe['sma12_deriv2_1h'] > -0.03) # # conditions.append(dataframe['hapercent'] > 0) # # conditions.append(dataframe['percent12'] < 0.01) # # conditions.append(dataframe['percent5'] < 0.01) # conditions.append(dataframe['max_rsi_24'] < 80) # # dynamic_rsi_threshold = 70 + 15 * np.tanh(dataframe["dist_sma200_1h"] * 5) # conditions.append((dataframe['max_rsi_12_1h'] < dynamic_rsi_threshold)) # conditions.append(dataframe[f"close"] > dataframe['sma60']) # conditions.append(((dataframe[f"range_pos"] < 0.05) ) | ((dataframe['sma12_deriv1'] > 0) & (dataframe['sma12_deriv2'] > 0))) # # conditions.append( # (dataframe['close_1h'] > dataframe[f'sma{self.buy_longue.value}_1h']) # | (dataframe['sma60_inv_1h'] == -1) # ) # if conditions: # dataframe.loc[ # reduce(lambda x, y: x & y, conditions), # ['enter_long', 'enter_tag'] # ] = (1, 'bear') # conditions = list() # conditions.append(dataframe['close_1h'] < dataframe[f'sma{self.buy_longue.value}_1h']) # conditions.append(dataframe['has_cross_min'].rolling(6).max() == 1) # conditions.append(dataframe['mid_smooth5'] > dataframe['mid_smooth5'].shift(1)) # conditions.append(dataframe['hapercent'] > 0) # conditions.append(dataframe['percent5'] < 0.01) # dataframe.loc[ # reduce(lambda x, y: x & y, conditions), # ['enter_long', 'enter_tag'] # ] = (1, 'cross_min') conditions = list() conditions.append(dataframe['sma12_deriv1'] > 0.00) conditions.append(dataframe['sma60_deriv1'] > 0.0) conditions.append(dataframe['sma5_deriv1_1h'] > 0.0) conditions.append(dataframe['sma12_deriv1_1h'] > 0.0) conditions.append(dataframe['sma24_deriv1_1h'] > 0.0) conditions.append(dataframe['sma100_deriv1_1h'] > 0.0) conditions.append(dataframe[f"range_pos"] < 0.025) # conditions.append(dataframe['sma12_deriv1_1h'] > 0.0) # # conditions.append(dataframe['close_1h'] < dataframe[f'sma{self.buy_longue.value}_1h']) # # conditions.append(dataframe['has_cross_min'].rolling(6).max() == 1) # # conditions.append(dataframe['mid_smooth5'] > dataframe['mid_smooth5'].shift(1)) # conditions.append(dataframe['min12'] == dataframe['min12'].shift(3)) # conditions.append((dataframe['percent24'] < -0.025) | (dataframe['percent12'] < -0.025)) # dataframe.loc[ # reduce(lambda x, y: x & y, conditions), # ['enter_long', 'enter_tag'] # ] = (1, 'Rise') # conditions = list() # conditions.append(dataframe['has_cross_min_6'] == True) # conditions.append(dataframe['min36'] == dataframe['min36'].shift(3)) # dataframe.loc[ # reduce(lambda x, y: x & y, conditions), # ['enter_long', 'enter_tag'] # ] = (1, 'Force') conditions = list() # conditions.append(dataframe['mid_regression'].shift(2) > dataframe['mid_regression'].shift(1)) # conditions.append(dataframe['mid_regression'].shift(1) < dataframe['mid_regression']) conditions.append(dataframe['close'] <= dataframe['min12_1h']) conditions.append(dataframe['min60'] == dataframe['min60'].shift(5)) conditions.append(dataframe['has_cross_min_6'] == 1) conditions.append(dataframe['down_count'] <= 5) conditions.append(dataframe['down_count'] <= 5) # conditions.append(dataframe['sma12_deriv1'] >= 0) dataframe.loc[ reduce(lambda x, y: x & y, conditions), ['enter_long', 'enter_tag'] ] = (1, 'Mid') return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: conditions = list() # # TODO: Its not dry code! # sell_indicator = self.sell_indicator0.value # sell_crossed_indicator = self.sell_crossed_indicator0.value # sell_operator = self.sell_operator0.value # sell_real_num = self.sell_real_num0.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # sell_indicator = self.sell_indicator1.value # sell_crossed_indicator = self.sell_crossed_indicator1.value # sell_operator = self.sell_operator1.value # sell_real_num = self.sell_real_num1.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # sell_indicator = self.sell_indicator2.value # sell_crossed_indicator = self.sell_crossed_indicator2.value # sell_operator = self.sell_operator2.value # sell_real_num = self.sell_real_num2.value # condition, dataframe = condition_generator( # dataframe, # sell_operator, # sell_indicator, # sell_crossed_indicator, # sell_real_num # ) # conditions.append(condition) # # # print(f"SELL indicators tested \n" # f"{self.sell_indicator0.value} {self.sell_crossed_indicator0.value} {self.sell_operator0.value} {self.sell_real_num0.value} \n" # f"{self.sell_indicator1.value} {self.sell_crossed_indicator1.value} {self.sell_operator1.value} {self.sell_real_num1.value} \n" # f"{self.sell_indicator2.value} {self.sell_crossed_indicator2.value} {self.sell_operator2.value} {self.sell_real_num2.value} \n" # ) # # # conditions.append(qtpylib.crossed_below(dataframe['mid'], dataframe['sma24'])) # conditions.append((dataframe['range_pos'] > 0.04)) # # if conditions: # dataframe.loc[reduce(lambda x, y: x & y, conditions), ['exit_long', 'exit_tag']] = (1, 'god') return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '1m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" tendency_col = f"{name}{suffixe}_state" series = dataframe[f"{name}{suffixe}"] d1 = series.diff() d2 = d1.diff() pmin = int(ema_period / 3) cond_bas = (d1.rolling(pmin).mean() > d1.rolling(ema_period).mean()) cond_haut = (d1.rolling(pmin).mean() < d1.rolling(ema_period).mean()) dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3) dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1)) dataframe[f"{name}{suffixe}_inv"] = np.where(cond_bas, -1, np.where(cond_haut, 1, 0)) short = d1.rolling(pmin).mean() long = d1.rolling(ema_period).mean() spread = short - long zscore = (spread - spread.rolling(ema_period).mean()) / spread.rolling(ema_period).std() dataframe[f"{name}{suffixe}_score"] = zscore # #################################################################### # Calcul de la pente lissée # d1 = series.diff() # d1_smooth = d1.rolling(5).mean() # # Normalisation # z = (d1_smooth - d1_smooth.rolling(ema_period).mean()) / d1_smooth.rolling(ema_period).std() # dataframe[f"{name}{suffixe}_trend_up"] = ( # (d1_smooth.shift(1) < 0) & # (d1_smooth > 0) & # (z > 1.0) # ) # # dataframe[f"{name}{suffixe}_trend_down"] = ( # (d1_smooth.shift(1) > 0) & # (d1_smooth < 0) & # (z < -1.0) # ) # # momentum_short = d1.rolling(int(ema_period / 2)).mean() # momentum_long = d1.rolling(ema_period * 2).mean() # # dataframe[f"{name}{suffixe}_trend_change_up"] = ( # (momentum_short.shift(1) < momentum_long.shift(1)) & # (momentum_short > momentum_long) # ) # # dataframe[f"{name}{suffixe}_trend_change_down"] = ( # (momentum_short.shift(1) > momentum_long.shift(1)) & # (momentum_short < momentum_long) # ) return dataframe @property def protections(self): return [ { "method": "CooldownPeriod", "stop_duration_candles": 6 }, # { # "method": "MaxDrawdown", # "lookback_period_candles": 96, # "trade_limit": 4, # "max_allowed_drawdown": 0.1, # "stop_duration_candles": 24 # }, # { # "method": "StoplossGuard", # "lookback_period_candles": 96, # "trade_limit": 2, # "stop_duration_candles": 48, # "only_per_pair": False # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 6, # "trade_limit": 2, # "stop_duration_candles": 60, # "required_profit": 0.02 # }, # { # "method": "LowProfitPairs", # "lookback_period_candles": 24, # "trade_limit": 4, # "stop_duration_candles": 2, # "required_profit": 0.01 # } ] def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|" ) self.printLineLog() # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', # 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # print(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1h'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1h'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1h'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1h'])) + " " + str( # int(last_candle['rsi_pct_1h'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1h = '' sma5_1h = '' sma5 = str(sma5_1h) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' # round(last_candle['max12_1h'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = self.getDistMax(last_candle, pair) # if trade_type is not None: # if np.isnan(last_candle['rsi_1h']): # string = ' ' # else: # string = (str(int(last_candle['rsi_1h']))) + " " + str(int(last_candle['rsi_deriv1_1h'])) # trade_type = trade_type \ # + " " + string \ # + " " + str(int(last_candle['rsi_1h'])) \ # + " " + str(int(last_candle['rsi_deriv1_1h'])) # val144 = self.getProbaHausse144(last_candle) # val1h = self.getProbaHausse1h(last_candle) color = GREEN if profit > 0 else RED # color_sma24 = GREEN if last_candle['sma24_deriv1_1h'] > 0 else RED # color_sma24_2 = GREEN if last_candle['sma24_deriv2_1h'] > 0 else RED # color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1h'] > 0 else RED # color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1h'] > 0 else RED # color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED # color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED # color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED # color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) profit = str(round(profit, 2)) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" ) def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1h|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getDistMax(self, last_candle, pair): mx = last_candle['max12_1h'] dist_max = round(100 * (mx - last_candle['close']) / mx, 0) return dist_max def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit return expected_profit def getLastClosedTrade(self, pair): # récupérer le dernier trade fermé trades = Trade.get_trades_proxy(pair=pair,is_open=False) if trades: last_trade = trades[-1] self.pairs[pair]['last_profit'] = last_trade.close_profit # ex: 0.12 = +12% self.pairs[pair]['last_trade'] = last_trade # ✅ Première dérivée(variation ou pente) # Positive: la courbe est croissante → tendance haussière. # Négative: la courbe est décroissante → tendance baissière. # Proche de 0: la courbe est plate → marché stable ou en transition. # # Applications: # Détecter les points d’inflexion(changement de tendance) quand elle s’annule.\ # Analyser la vitesse d’un mouvement(plus elle est forte, plus le mouvement est impulsif). # # ✅ Seconde dérivée(accélération ou concavité) # Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse. # Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse. # Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements. # # Exemples: # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # # Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0. # Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe. def calculateRegression(self, dataframe: DataFrame, column='close', window=50, degree=3, future_offset: int = 10 # projection à n bougies après ) -> DataFrame: df = dataframe.copy() regression_fit = [] regression_future_fit = [] regression_fit = [] regression_future_fit = [] for i in range(len(df)): if i < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # Fin de la fenêtre d’apprentissage end_index = i start_index = i - window y = df[column].iloc[start_index:end_index].values # Si les données sont insuffisantes (juste par précaution) if len(y) < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # x centré pour meilleure stabilité numérique x = np.linspace(-1, 1, window) coeffs = np.polyfit(x, y, degree) poly = np.poly1d(coeffs) # Calcul point présent (dernier de la fenêtre) x_now = x[-1] regression_fit.append(poly(x_now)) # Calcul point futur, en ajustant si on dépasse la fin remaining = len(df) - i - 1 effective_offset = min(future_offset, remaining) x_future = x_now + (effective_offset / window) * 2 # respect du même pas regression_future_fit.append(poly(x_future)) df[f"{column}_regression"] = regression_fit # # 2. Dérivée première = différence entre deux bougies successives # df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"], # 4) # # # 3. Dérivée seconde = différence de la dérivée première # df[f"{column}_regression_deriv2"] = round( # 10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4) # # df[f"{column}_future_{future_offset}"] = regression_future_fit # # 2. Dérivée première = différence entre deux bougies successives # df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4) # # # 3. Dérivée seconde = différence de la dérivée première # df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4) return df def calculateDownAndUp(self, dataframe, limit=0.0001): dataframe['down'] = dataframe['mid_regression'] <= dataframe['mid_regression'].shift(1) dataframe['up'] = dataframe['mid_regression'] >= dataframe['mid_regression'].shift(1) dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') return dataframe def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values