# pr#agma pylint: disable=missing-docstring, invalid-name, pointless-string-statement # isort: skip_file # --- Do not remove these libs --- from datetime import datetime, timezone, timedelta import numpy import numpy as np # noqa import pandas as pd # noqa from freqtrade.strategy.parameters import DecimalParameter, BooleanParameter, IntParameter from pandas import DataFrame import math from functools import reduce from freqtrade.strategy.interface import IStrategy # -------------------------------- # Add your lib to import here import talib.abstract as ta import freqtrade.vendor.qtpylib.indicators as qtpylib # This class is a sample. Feel free to customize it. class StrategyPierrick41220(IStrategy): # Strategy interface version - allow new iterations of the strategy interface. # Check the documentation or the Sample strategy to get the latest version. INTERFACE_VERSION = 2 # valeur de bbwidth pour démarrer buy_bollinger = DecimalParameter(0.00, 0.18, decimals=2, default=0.065, space="buy") # Valeur de la deuxième condition bollinger avec condition sma200 # buy_bollinger_2 = DecimalParameter(0.0, 0.08, decimals=2, default=0.04, space="buy") # buy_min = DecimalParameter(1, 1.1, decimals=2, default=1.01, space="buy") # buy_percent = DecimalParameter(1, 1.1, decimals=2, default=1.01, space="buy") # volume à atteindre buy_volume = DecimalParameter(0, 50, decimals=1, default=18, space="buy") buy_step = IntParameter(1, 8, default=3, space="buy") buy_rolling = IntParameter(-20, 0, default=-6, space="buy") # buy_rsi = IntParameter(20, 40, default=30, space="buy") # buy_adx_enabled = BooleanParameter(default=True, space="buy") # buy_rsi_enabled = CategoricalParameter([True, False], default=False, space="buy") # buy_trigger = CategoricalParameter(["bb_lower", "macd_cross_signal"], default="bb_lower", space="buy") # ROI table: minimal_roi = { # "0": 0.015 "0": 5 } # Stoploss: stoploss = -1 trailing_stop = False trailing_stop_positive = 0.02 trailing_stop_positive_offset = 0.0275 # 0.015 trailing_only_offset_is_reached = True # max_open_trades = 3 # Optimal ticker interval for the strategy. timeframe = '5m' # Run "populate_indicators()" only for new candle. process_only_new_candles = False # These values can be overridden in the "ask_strategy" section in the config. use_sell_signal = True sell_profit_only = False ignore_roi_if_buy_signal = False # Number of candles the strategy requires before producing valid signals startup_candle_count: int = 30 # Optional order type mapping. order_types = { 'buy': 'limit', 'sell': 'limit', 'stoploss': 'market', 'stoploss_on_exchange': False } # Optional order time in force. order_time_in_force = { 'buy': 'gtc', 'sell': 'gtc' } plot_config = { # Main plot indicators (Moving averages, ...) 'main_plot': { 'bb_lowerband': {'color': 'red'}, 'bb_upperband': {'color': 'green'}, 'sma100': {'color': 'blue'}, 'sma10': {'color': 'yellow'}, 'min': {'color': 'white'}, 'max': {'color': 'white'}, 'min20': {'color': 'pink'}, 'sma20': {'color': 'cyan'} }, 'subplots': { # Subplots - each dict defines one additional plot "BB": { 'bb_width': {'color': 'white'}, 'bb_min': {'color': 'red'}, }, # "ADX": { # 'adx': {'color': 'white'}, # 'minus_dm': {'color': 'blue'}, # 'plus_dm': {'color': 'red'} # }, "Rsi": { 'rsi': {'color': 'pink'}, 'rsi3': {'color': 'blue'}, }, "rolling": { 'bb_rolling': {'color': '#87e470'}, "bb_rolling_min": {'color': '#ac3e2a'} }, "percent": { "percent": {'color': 'green'}, "percent5": {'color': 'red'}, "percent20": {'color': 'blue'}, "pente": {'color': 'yellow'} } } } def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() # print("proposed_stake=", proposed_stake, " max_stake=", max_stake) if current_candle['bb_width'] > 0.065: print("use more stake", pair, " ", proposed_stake * 2) return min(max_stake, proposed_stake * 2) if current_candle['bb_width'] > 0.045: print("use more stake", pair, " ", proposed_stake * 1.5) return min(max_stake, proposed_stake * 1.5) # if current_candle['bb_width'] < 0.020: # print("use less stake", pair, " ", proposed_stake / 2) # return min(max_stake, proposed_stake / 2) # if self.config['stake_amount'] == 'unlimited': # # Use entire available wallet during favorable conditions when in compounding mode. # return max_stake # else: # # Compound profits during favorable conditions instead of using a static stake. # return self.wallets.get_total_stake_amount() / self.config['max_open_trades'] # Use default stake amount. return proposed_stake def custom_sell(self, pair: str, trade: 'Trade', current_time: 'datetime', current_rate: float, current_profit: float, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() previous_last_candle = dataframe.iloc[-2].squeeze() previous_5_candle = dataframe.iloc[-5].squeeze() # (last_candle['percent5'] < -0.005) \ # if (0 < current_profit < 0.005) \ # & ((current_time - trade.open_date_utc).seconds >= 3600 * 2): # # & (previous_last_candle['sma10'] > last_candle['sma10']): # print("too_small_gain", pair, trade, " profit=", current_profit, " rate=", current_rate, " percent5=", # last_candle['percent5']) # return 'too_small_gain' # if (current_profit < -0.05) \ # & ((current_time - trade.open_date_utc).days >= 3): # print("lost_half_profit", pair, trade, " profit=", current_profit, " rate=", current_rate) # return 'stop_loss_profit' # if (current_profit > 0.02) \ # & (last_candle['percent'] < 0.01) \ # & ((current_time - trade.open_date_utc).seconds >= 3600): # print("lost_half_profit", pair, trade, " profit=", current_profit, " rate=", current_rate) # return 'lost_half_profit' if (current_profit > 0) \ & ((current_time - trade.open_date_utc).seconds >= 3600 * 2) \ & (previous_5_candle['sma20'] > last_candle['sma20']) \ & (last_candle['percent'] < 0) \ & (last_candle['percent5'] < 0): # self.lock_pair(pair, until=current_time + timedelta(hours=3)) print("over_bb_band_sma20_desc", pair, trade, " profit=", current_profit, " rate=", current_rate) return 'over_bb_band_sma20_desc' if (current_profit > 0) \ & (last_candle['rsi'] > 75): print("over_rsi", pair, trade, " profit=", current_profit, " rate=", current_rate) return 'over_rsi' # description trade # Trade(id=0, pair=CAKE/USDT, amount=4.19815281, open_rate=11.91000000, open_since=2021-12-22 17:55:00) # print(last_candle) if 0.05 < current_profit < 1: if ( (previous_last_candle['sma10'] > last_candle['sma10'] * 1.005) & (current_time - trade.open_date_utc).seconds >= 3600 * 3 # ) | ( # (current_time - trade.open_date_utc).seconds >= 3600 * 6 ): # self.lock_pair(pair, until=current_time + timedelta(hours=3)) print("profit_3h_sma10_desc", pair, trade, " profit=", current_profit, " rate=", current_rate) return 'profit_3h_sma10_desc' if (0 < current_profit < 0.1) \ & (previous_last_candle['sma20'] > last_candle['sma20']) \ & ((current_time - trade.open_date_utc).seconds >= 3600 * 5): print("profit_5h_sma20_desc", pair, trade, " profit=", current_profit, " rate=", current_rate) return 'profit_5h_sma20_desc' def informative_pairs(self): return [] def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # MACD # macd = ta.MACD(dataframe) # dataframe['macd'] = macd['macd'] # dataframe['macdsignal'] = macd['macdsignal'] # dataframe['macdhist'] = macd['macdhist'] # # # Plus Directional Indicator / Movement # dataframe['plus_dm'] = ta.PLUS_DM(dataframe) # dataframe['plus_di'] = ta.PLUS_DI(dataframe) # # # Minus Directional Indicator / Movement # dataframe['adx'] = ta.ADX(dataframe) # dataframe['minus_dm'] = ta.MINUS_DM(dataframe) # dataframe['minus_di'] = ta.MINUS_DI(dataframe) # dataframe['min'] = ta.MIN(dataframe) # dataframe['max'] = ta.MAX(dataframe) # # Aroon, Aroon Oscillator # aroon = ta.AROON(dataframe) # dataframe['aroonup'] = aroon['aroonup'] # dataframe['aroondown'] = aroon['aroondown'] # dataframe['aroonosc'] = ta.AROONOSC(dataframe) # RSI dataframe['rsi'] = ta.RSI(dataframe) dataframe["rsi3"] = (dataframe['rsi']).rolling(3).mean() # # EMA - Exponential Moving Average # dataframe['ema3'] = ta.EMA(dataframe, timeperiod=3) # dataframe['ema5'] = ta.EMA(dataframe, timeperiod=5) # dataframe['ema10'] = ta.EMA(dataframe, timeperiod=10) # dataframe['ema21'] = ta.EMA(dataframe, timeperiod=21) # dataframe['ema50'] = ta.EMA(dataframe, timeperiod=50) # dataframe['ema100'] = ta.EMA(dataframe, timeperiod=100) # # SMA - Simple Moving Average # dataframe['sma3'] = ta.SMA(dataframe, timeperiod=3) # dataframe['sma5'] = ta.SMA(dataframe, timeperiod=5) dataframe['sma10'] = ta.SMA(dataframe, timeperiod=10) dataframe['sma20'] = ta.SMA(dataframe, timeperiod=20) dataframe['sma50'] = ta.SMA(dataframe, timeperiod=50) dataframe['sma100'] = ta.SMA(dataframe, timeperiod=100) # dataframe['sma200'] = ta.SMA(dataframe, timeperiod=200) # dataframe['sma200_95'] = ta.SMA(dataframe, timeperiod=200) * 0.95 # dataframe['sma200_98'] = ta.SMA(dataframe, timeperiod=200) * 0.98 # dataframe['sma500'] = ta.SMA(dataframe, timeperiod=500) # dataframe['sma500_90'] = ta.SMA(dataframe, timeperiod=500) * 0.9 # dataframe['sma500_95'] = ta.SMA(dataframe, timeperiod=500) * 0.95 # dataframe['sma500_20'] = ta.SMA(dataframe, timeperiod=500) * 0.2 dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"] dataframe["percent5"] = dataframe["percent"].rolling(5).sum() dataframe["percent20"] = dataframe["percent"].rolling(20).sum() dataframe['min'] = ta.MIN(dataframe['close'], timeperiod=200) dataframe['min20'] = ta.MIN(dataframe['close'], timeperiod=20) dataframe['max'] = ta.MAX(dataframe['close'], timeperiod=200) dataframe['max_min'] = dataframe['max'] / dataframe['min'] dataframe['pente'] = ((dataframe['sma20'] - dataframe['sma20'].shift(1)) / dataframe['sma20'].shift(1)) # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = ( (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"] ) dataframe['bb_min'] = ta.MIN(dataframe['bb_lowerband'], timeperiod=36) dataframe["rolling"] = ( 100 * (dataframe["close"] - dataframe["bb_lowerband"]) / dataframe["bb_lowerband"]).rolling( 3).mean() dataframe["bb_rolling"] = dataframe["rolling"] / dataframe["bb_width"] dataframe['bb_rolling_min'] = ta.MIN(dataframe['bb_rolling'], timeperiod=10) dataframe['bb_buy'] = (dataframe['min'] + (dataframe['max'] - dataframe['min']) / 3) # # Stoch # stoch = ta.STOCH(dataframe) # dataframe['slowk'] = stoch['slowk'] # # # Inverse Fisher transform on RSI, values [-1.0, 1.0] (https://goo.gl/2JGGoy) # rsi = 0.1 * (dataframe['rsi'] - 50) # dataframe['fisher_rsi'] = (numpy.exp(2 * rsi) - 1) / (numpy.exp(2 * rsi) + 1) # # # SAR Parabol # dataframe['sar'] = ta.SAR(dataframe) # # # Hammer: values [0, 100] # dataframe['CDLHAMMER'] = ta.CDLHAMMER(dataframe) return dataframe def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: step = self.buy_step.value # if reduce(lambda x, y: x & y, dataframe['bb_width'] < 0.015): # step = 5 # else: # if reduce(lambda x, y: x & y, dataframe['bb_width'] < 0.03): # step = 4 bb_rolling_max = self.buy_rolling.value condition_bb_rolling_1 = [ (dataframe['bb_width'] >= 0.035), # (dataframe['close'] > dataframe['open']), (dataframe['close'] < dataframe['sma10']), # (dataframe['bb_rolling_min'].shift(step) <= bb_rolling_max), # (dataframe['bb_rolling_min'].shift(step) >= dataframe['bb_rolling'].shift(step)), (dataframe['close'].shift(step) < dataframe['min'].shift(step) + ( dataframe['max'].shift(step) - dataframe['min'].shift(step)) / 3), (dataframe['min'].shift(step) == dataframe['min']), (dataframe['volume'] * dataframe['close'] / 1000 > 100), # (dataframe['rsi'] <= 30) ] condition_bb_rolling1 = reduce(lambda x, y: x & y, condition_bb_rolling_1) step = 2 condition_bb_rolling_2 = [ (dataframe['bb_width'] >= 0.04), (dataframe['close'] < dataframe['sma10']), ((dataframe['close'] > dataframe['open']) | (dataframe['percent'] > -0.01)), # (dataframe['bb_rolling_min'].shift(step) <= bb_rolling_max), # (dataframe['bb_rolling_min'].shift(step) >= dataframe['bb_rolling'].shift(step)), (dataframe['close'].shift(step) < dataframe['min'].shift(step) + ( dataframe['max'].shift(step) - dataframe['min'].shift(step)) / 3), (dataframe['min'].shift(step) == dataframe['min']), (dataframe['volume'] * dataframe['close'] / 1000 > 100), (dataframe['rsi'] <= 30) ] condition_bb_rolling2 = reduce(lambda x, y: x & y, condition_bb_rolling_2) dataframe.loc[ condition_bb_rolling1 | condition_bb_rolling2, 'buy'] = 1 return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: condition_sell_1 = [ (dataframe['close'] < dataframe['open']), (dataframe['close'].shift(1) < dataframe['open'].shift(1)), (dataframe['close'].shift(2) < dataframe['open'].shift(2)), (dataframe['close'] < dataframe['bb_lowerband']), (((dataframe['bb_lowerband'].shift(2) - dataframe['bb_lowerband']) / dataframe[ 'bb_lowerband']) >= 0.02), # (((dataframe['close'].shift(1) - dataframe['close']) / dataframe['close']) >= 0.025) (dataframe['rsi'] <= 40) ] condition_sell1 = reduce(lambda x, y: x & y, condition_sell_1) # condition = np.where(condition_sell1, 'True', 'False') # if bool(condition_sell_1): # # print('condition_sell1=', condition_sell1) # self.lock_pair(metadata['pair'], until=datetime.now(timezone.utc) + timedelta(hours=3)) dataframe.loc[ ( condition_sell1 # ) | ( # (dataframe['close'] < dataframe['bb_lowerband']) & # StrategyHelperLocal.red_candles(dataframe) & # (dataframe['percent5'] < -0.04) ), 'sell'] = 1 return dataframe class StrategyHelperLocal: @staticmethod def red_candles(dataframe, shift=0): """ evaluates if we are having 8 red candles in a row :param self: :param dataframe: :param shift: shift the pattern by n :return: """ return ( (dataframe['open'].shift(shift) > dataframe['close'].shift(shift)) & (dataframe['open'].shift(1 + shift) > dataframe['close'].shift(1 + shift)) & (dataframe['open'].shift(2 + shift) > dataframe['close'].shift(2 + shift)) & (dataframe['open'].shift(3 + shift) > dataframe['close'].shift(3 + shift)) & (dataframe['open'].shift(4 + shift) > dataframe['close'].shift(4 + shift)) & (dataframe['open'].shift(5 + shift) > dataframe['close'].shift(5 + shift)) & (dataframe['open'].shift(6 + shift) > dataframe['close'].shift(6 + shift)) & (dataframe['open'].shift(7 + shift) > dataframe['close'].shift(7 + shift)) )