# pragma pylint: disable=missing-docstring, invalid-name, pointless-string-statement # flake8: noqa: F401 # isort: skip_file # --- Do not remove these libs --- # noinspection PyUnresolvedReferences import numpy as np # noinspection PyUnresolvedReferences import pandas as pd # from future.backports.xmlrpc.client import DateTime from pandas import DataFrame from datetime import datetime # noinspection PyUnresolvedReferences from freqtrade.persistence import Trade from typing import Optional, Tuple, Union from datetime import timezone import logging # noinspection PyUnresolvedReferences from freqtrade.strategy import (IStrategy, informative) # -------------------------------- # Add your lib to import here # noinspection PyUnresolvedReferences import talib.abstract as ta # noinspection PyUnresolvedReferences import pandas_ta as pta # noinspection PyUnresolvedReferences from technical import qtpylib logger = logging.getLogger(__name__) class BTC_Staking(IStrategy): INTERFACE_VERSION = 3 timeframe = '5m' can_short: bool = False stoploss = -0.99 trailing_stop = False process_only_new_candles = True use_exit_signal = True exit_profit_only = False ignore_roi_if_entry_signal = False startup_candle_count: int = 50 # Position adjustment position_adjustment_enable = True max_entry_position_adjustment = 99 minimal_roi = {"0": 0.99} # strategy parameters staking_delay = 23 # hours exit_profit = 0.011 # percent ratio stakes = 20 # days red_candle_pct = 1.10 # percent # ------------------------------------------------------------------------------------------------------------------ def version(self) -> str: return "250311" # ------------------------------------------------------------------------------------------------------------------ def log(self, action='', stake=0.0, ctime: datetime = None, count_of_entries=0, price: float = 0, msg=''): free = self.wallets.get_free(self.stake_currency) remain = free - stake full = self.wallets.get_total_stake_amount() formatted_date = ctime.strftime('%Y-%m-%d %H:%M') # logger.info(f" | {formatted_date:16} | {count_of_entries:2} | {stake:6.2f} | {remain:7.2f} | {msg} |") print( f"| {action:8} | {formatted_date:16} | {count_of_entries + 1:2} | {stake:6.2f} | {remain:7.2f} | {full:7.2f} | {price:9.2f} | {msg}") # ------------------------------------------------------------------------------------------------------------------ def custom_exit(self, pair: str, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, **kwargs): if trade.has_open_orders: return None count_of_entries = trade.nr_of_successful_entries if current_profit >= self.exit_profit: self.log( action="🟥 Sell", stake=0, ctime=current_time, count_of_entries=count_of_entries, price=current_rate, msg='take_profit_' + str(count_of_entries + 1) ) return 'take_profit_' + str(count_of_entries) return None # ------------------------------------------------------------------------------------------------------------------ def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: Optional[float], max_stake: float, leverage: float, entry_tag: Optional[str], side: str, **kwargs) -> float: # This is called when placing the initial order (opening trade) stake = self.calculate_stake() if min_stake < stake < max_stake: self.log( action="🟩 Buy", stake=stake, ctime=current_time, count_of_entries=0, price=current_rate, msg='' ) return stake return 0 # ------------------------------------------------------------------------------------------------------------------ def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: Optional[float], max_stake: float, current_entry_rate: float, current_exit_rate: float, current_entry_profit: float, current_exit_profit: float, **kwargs ) -> Union[Optional[float], Tuple[Optional[float], Optional[str]]]: # ne rien faire si ordre deja en cours if trade.has_open_orders: return None # prépare les données df, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last = df.iloc[-1].squeeze() count_of_entries = trade.nr_of_successful_entries current_time = current_time.astimezone(timezone.utc) seconds_since_filled = (current_time - trade.date_last_filled_utc).total_seconds() # calcul de la nouvelle mise stake = self.calculate_stake() # déclenche un achat si bougie rouge importante pct = (last['close'] - last['open']) / (last['open']) * 100 if ( stake and pct <= -self.red_candle_pct and min_stake < stake < max_stake and seconds_since_filled > (60 * 5) # and seconds_since_filled > (1 * 3600) # and count_of_entries < 10 ): msg = f"🔻 {trade.pair} Price drop" self.log( action="🟧 Adjust", stake=stake, ctime=current_time, count_of_entries=count_of_entries, price=current_rate, msg=msg ) self.dp.send_msg(msg) return stake # déclenche un achat en conditions d'achat standard if ( stake and last['close'] < last['sma20'] and last['close'] < last['open'] and min_stake < stake < max_stake and seconds_since_filled > self.staking_delay * 3600 ): self.log( action="🟨 Adjust", stake=stake, ctime=current_time, count_of_entries=count_of_entries, price=current_rate, msg='' ) return stake return None # ------------------------------------------------------------------------------------------------------------------ def calculate_stake(self) -> float: full = self.wallets.get_total_stake_amount() stake = full / self.stakes return stake # ------------------------------------------------------------------------------------------------------------------ def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe['sma20'] = ta.SMA(dataframe, timeperiod=20) dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"] return dataframe # ------------------------------------------------------------------------------------------------------------------ def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe.loc[ (dataframe['volume'] > 0) # un petit gain avec ça & (dataframe['percent'] < 0) , 'enter_long'] = 1 if self.dp.runmode.value in ('backtest'): today = datetime.now().strftime("%Y-%m-%d-%H:%M:%S") dataframe.to_feather(f"user_data/data/binance/{today}-{metadata['pair'].replace('/', '_')}_df.feather") return dataframe # ------------------------------------------------------------------------------------------------------------------ def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: return dataframe