Files
Freqtrade/Empty5m.py
2026-03-27 20:55:00 +01:00

1452 lines
68 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# pr#agma pylint: disable=missing-docstring, invalid-name, pointless-string-statement
# isort: skip_file
# --- Do not remove these libs ---
from datetime import datetime
from freqtrade.persistence import Trade
import numpy as np # noqa
import pandas as pd # noqa
from pandas import DataFrame
from datetime import timezone, timedelta
from datetime import timedelta, datetime
from freqtrade.strategy.interface import IStrategy
from freqtrade.persistence import Trade
from typing import Optional, Union, Tuple
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
# --------------------------------
# Add your lib to import here
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator
from ta.momentum import RSIIndicator, StochasticOscillator
from functools import reduce
from random import shuffle
import logging
logger = logging.getLogger(__name__)
timeperiods = [3, 5, 12, 24, 36, 48, 60]
# Couleurs ANSI de base
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
RESET = "\033[0m"
timeperiods = [3, 5, 12, 24, 36, 48, 60]
long_timeperiods = [80, 100, 120, 140, 160, 180, 200]
sma_indicators = list()
sma_indicators_h = list()
sma_indicators_d = list()
score_indicators = list()
stop_buying_indicators = list()
sma_deriv1_indicators = list()
for timeperiod in timeperiods:
sma_indicators.append(f"sma{timeperiod}")
sma_indicators_h.append(f"sma{timeperiod}")
sma_indicators_d.append(f"sma{timeperiod}")
# sma_indicators.append(f"sma{timeperiod}_1h")
# god_genes_with_timeperiod.append(f'max{timeperiod}')
# god_genes_with_timeperiod.append(f'min{timeperiod}')
# god_genes_with_timeperiod.append(f"percent{timeperiod}")
# god_genes_with_timeperiod.append(f"sma{timeperiod}")
# sma_deriv1_indicators.append(f"sma{timeperiod}_deriv1")
sma_deriv1_indicators.append(f"sma{timeperiod}_deriv1")
# sma_deriv1_indicators.append(f"sma{timeperiod}_deriv2")
# sma_deriv1_indicators.append(f"sma{timeperiod}_score")
# stoploss_indicators.append(f"stop_buying{timeperiod}")
stop_buying_indicators.append(f"stop_buying{timeperiod}_1h")
score_indicators.append(f"sma{timeperiod}_score")
# score_indicators.append(f"sma{timeperiod}_score_1h")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_up")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_down")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_up")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_down")
for timeperiod in long_timeperiods:
sma_deriv1_indicators.append(f"sma{timeperiod}_deriv1_1h")
sma_indicators_d.append(f"sma{timeperiod}")
# number of candles to check up,don,off trend.
TREND_CHECK_CANDLES = 4
DECIMALS = 1
def normalize(df):
df = (df-df.min())/(df.max()-df.min())
return df
# This class is a sample. Feel free to customize it.
class Empty5m(IStrategy):
# Strategy interface version - allow new iterations of the strategy interface.
# Check the documentation or the Sample strategy to get the latest version.
INTERFACE_VERSION = 2
# ROI table:
minimal_roi = {
#"0": 0.015
"0": 0.5
}
# Stoploss:
stoploss = -1
trailing_stop = False
# trailing_stop_positive = 0.15
# trailing_stop_positive_offset = 0.20
# trailing_only_offset_is_reached = False
position_adjustment_enable = True
use_custom_stoploss = True
can_short = True
#max_open_trades = 3
# Optimal ticker interval for the strategy.
timeframe = '1m'
# Run "populate_indicators()" only for new candle.
process_only_new_candles = False
# These values can be overridden in the "ask_strategy" section in the config.
use_sell_signal = True
sell_profit_only = False
ignore_roi_if_buy_signal = False
# Number of candles the strategy requires before producing valid signals
startup_candle_count: int = 30
columns_logged = False
pairs = {
pair: {
'first_amount': 0,
"first_buy": 0,
"last_buy": 0.0,
"last_min": 999999999999999.5,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
'previous_profit': 0,
"last_candle": {},
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'last_profit': 0,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
'force_buy': False,
'current_trade': None,
'last_trade': None,
'force_stop': False,
'count_of_lost': 0
}
for pair in ["BTC/USDC", "BTC/USDT"]
}
plot_config = {
"main_plot": {
"sma5": {
"color": "#7aa90b"
},
"min24": {
"color": "#121acd"
}
},
"subplots": {
"Inv": {
"sma5_inv": {
"color": "#aef878",
"type": "line"
},
"zero": {
"color": "#fdba52"
},
"sma24_score": {
"color": "#f1f5b0"
}
},
"drv": {
"sma5_deriv1": {
"color": "#96eebb"
}
}
},
"options": {
"markAreaZIndex": 1
}
}
# start_bull_indicator = CategoricalParameter(sma_indicators_d, default='sma100', space='buy', optimize=True, load=True)
# start_bull_deriv1 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True)
# start_bull_deriv2 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True)
# start_bear_indicator = CategoricalParameter(sma_indicators_d, default='sma100', space='buy', optimize=True, load=True)
# start_bear_deriv1 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True)
# start_bear_deriv2 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True)
# buy_deriv1_sma60 = DecimalParameter(-0.005, 0.005, decimals=4, default=0, space='buy', optimize=True, load=True)
# buy_deriv1_sma5d = DecimalParameter(-0.007, 0.007, decimals=4, default=0, space='buy', optimize=True, load=True)
# buy_deriv1_sma12d = DecimalParameter(-0.007, 0.007, decimals=4, default=0, space='buy', optimize=True, load=True)
# buy_deriv2_sma60 = DecimalParameter(-0.0005, 0.0005, decimals=4, default=0, space='buy', optimize=True, load=True)
# buy_deriv2_sma5d = DecimalParameter(-0.0007, 0.0007, decimals=4, default=0, space='buy', optimize=True, load=True)
# buy_deriv2_sma12d = DecimalParameter(-0.0007, 0.0007, decimals=4, default=0, space='buy', optimize=True, load=True)
# buy_longue = CategoricalParameter(long_timeperiods, default=120, space='buy', optimize=True, load=True)
# buy_longue_derive = CategoricalParameter(sma_deriv1_indicators, default="sma60_deriv1_1h", space='buy', optimize=False, load=True)
# sell_score_indicator = CategoricalParameter(score_indicators, default="sma24_score", space='sell')
# sell_sma_indicators = CategoricalParameter(sma_indicators, default="sma24_score", space='sell')
# sell_crossed_sma_indicators = CategoricalParameter(sma_indicators, default="sma24_score", space='sell')
drop_from_last_entry = DecimalParameter(-0.1, 0, decimals=2, default=-0.025, space='protection', optimize=True, load=True)
# baisses = list()
# for i in range(0, 0.5, 0.05):
# baisses.append(i)
# mises_bear = IntParameter(1, 10, default=1, space='protection')
mises_bull = IntParameter(1, 10, default=1, space='protection')
sell_force_sell = DecimalParameter(-0.2, 0, decimals=3, default=-0.02, space='sell')
sell_indicator = CategoricalParameter(sma_indicators, default="sma36", space='sell', optimize=True, load=True)
baisse = DecimalParameter(0.1, 0.5, decimals=2, default=0.3, space='sell', optimize=True, load=True)
b30_indicateur = CategoricalParameter(sma_indicators, default="sma36", space='sell', optimize=True, load=True)
# lost_indicator = CategoricalParameter(sma_deriv1_indicators, default="sma5_deriv1", space='protection')
# range_pos_stoploss = DecimalParameter(0, 0.1, decimals=2, default=0.05, space='protection')
# stoploss_force = DecimalParameter(-0.2, 0, decimals=2, default=-0.05, space='protection')
# stoploss_timeperiod = CategoricalParameter(timeperiods, default="12", space='protection')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
return self.adjust_stake_amount(pair, last_candle)
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
if (self.pairs[pair]['first_amount'] > 0):
amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount'])
else:
# range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1h"]
# range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1h"]
#
# if range_max == range_min:
# return -0.1 # sécurité
#
# range_pos = (last_candle['close'] - range_min) / (range_max - range_min)
# range_pos = last_candle[f"range_pos"]
# sl_min = self.wallets.get_available_stake_amount() / 6
# sl_max = self.wallets.get_available_stake_amount() / 2
#
# amount = sl_min + (1 - range_pos) * (sl_max - sl_min)
# if last_candle['enter_tag'] in ['fall', 'bear', 'Force', 'Range-']:
# amount = self.wallets.get_available_stake_amount() / self.mises_bear.value
# else:
amount = self.wallets.get_available_stake_amount() / self.mises_bull.value # / (2 * self.pairs[pair]['count_of_lost'] + 1)
# factor = 1
#
# if self.pairs[pair]['last_trade'] is not None \
# and self.pairs[pair]['last_trade'].close_profit is not None \
# and self.pairs[pair]['last_trade'].close_profit > 0.01:
# factor = self.pairs[pair]['last_trade'].close_profit / 0.01
# amount = amount / factor
return min(amount, self.wallets.get_available_stake_amount())
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# print("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
return 0
#
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
before_last_candle_12 = dataframe.iloc[-13].squeeze()
before_last_candle_24 = dataframe.iloc[-25].squeeze()
last_candle_3 = dataframe.iloc[-4].squeeze()
last_candle_previous_1h = dataframe.iloc[-13].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pair = trade.pair
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
# last_lost = self.getLastLost(last_candle, pair)
# pct_first = 0
stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))
# if (last_candle['enter_long'] == 1 and current_profit < -0.05 and stake_amount > 10) :
#
# print(f"adjust {current_time} {stake_amount}")
# print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
# return stake_amount
if last_candle['enter_long'] != 1:
return None
filled_buys = [
o for o in trade.orders
if o.status == "closed" and o.ft_order_side == "buy"
]
if not filled_buys:
return None
last_buy = max(filled_buys, key=lambda o: o.order_date)
last_entry_price = last_buy.price
if last_entry_price:
drop_from_last_entry = (current_rate - last_entry_price) / last_entry_price
if drop_from_last_entry <= self.drop_from_last_entry.value and last_candle['min60'] == last_candle_3['min60'] \
and last_candle['range_pos'] < 0.03 and last_candle['hapercent'] > 0:
# stake_amount = trade.stake_amount
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['total_amount'] += stake_amount
# print(f"adjust {pair} drop={drop_from_last_entry} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
# print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟧 Loss -",
dispo=round(dispo,0),
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
if entry_tag == 'Range-':
self.pairs[pair]['count_of_lost'] = 0
if entry_tag == 'Force':
if self.pairs[pair]['count_of_lost'] >= 1:
self.pairs[pair]['count_of_lost'] = 0
condition = False
else:
condition = False if self.pairs[pair]['count_of_lost'] >= 1 \
and (last_candle['sma12_deriv1_1h'] < 0.00) \
and entry_tag != 'dist' else True
reason = ''
if not condition:
reason = 'lost'
if condition:
if last_candle['range_pos'] > 0.05:# and self.pairs[pair]['force_stop']:
condition = last_candle['sma5'] > last_candle['sma60']
if not condition:
reason = 'range'
# if self.pairs[pair]['force_stop'] and last_candle['range_pos'] < 0.02:
# self.pairs[pair]['force_stop'] = False
if False and self.pairs[pair]['last_trade'].close_date is not None:
# base cooldown = n bougies / cooldown proportionnel au profit
# bougies de plus par %
cooldown_candles = 12 + 6 * (int(self.pairs[pair]['last_profit'] / 0.01)) # réglable
# temps depuis la fermeture
candles_since_close = ((current_time - self.pairs[pair]['last_trade'].close_date).total_seconds() / 3600) # seconds / heure
condition = (candles_since_close >= cooldown_candles)
print(f"Cool close date = trade={self.pairs[pair]['last_trade'].close_date} down {round(self.pairs[pair]['last_profit'], 3)} {cooldown_candles} {candles_since_close}")
# self.should_enter_trade(pair, last_candle, current_time)
if self.pairs[pair]['stop']:
reason = 'stop'
allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
# force = self.pairs[pair]['force_buy']
# if self.pairs[pair]['force_buy']:
# self.pairs[pair]['force_buy'] = False
# allow_to_buy = True
# else:
# if not self.should_enter_trade(pair, last_candle, current_time):
# allow_to_buy = False
dispo = round(self.wallets.get_available_stake_amount())
if allow_to_buy:
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_profit'] = 0
self.pairs[pair]['last_trade'] = None
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
self.pairs[pair]['last_date'] = current_time
# self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.pairs[pair]['first_amount'] = stake_amount
self.pairs[pair]['total_amount'] = stake_amount
# print(f"Buy {pair} {current_time} {entry_tag} dispo={dispo} amount={stake_amount} rate={rate} rate={rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
else:
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + reason,
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=0
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
profit = trade.calc_profit(rate)
force = self.pairs[pair]['force_sell']
allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'god') or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') or (exit_reason == 'trailing_stop_loss')
minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0))
if allow_to_sell:
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_candle'] = last_candle
profit = trade.calc_profit(rate)
self.pairs[pair]['previous_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} rate={rate} open_rate={trade.open_rate} profit={profit}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟥Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(profit, 2)
)
self.pairs[pair]['first_amount'] = 0
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['has_gain'] = 0
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['total_amount'] = 0
self.pairs[pair]['count_of_buys'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['current_trade'] = None
self.pairs[pair]['max_profit'] = 0
if profit < 0:
self.pairs[pair]['count_of_lost'] += 1
else:
self.pairs[pair]['count_of_lost'] = 0
else:
print(f"{current_time} STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning")
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
before_last_candle = dataframe.iloc[-2]
self.pairs[pair]['current_trade'] = trade
# momentum = last_candle[self.sell_score_indicator.value]
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
count_of_buys = trade.nr_of_successful_entries
expected_profit = self.expectedProfit(pair, last_candle)
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
max_profit = self.pairs[pair]['max_profit']
# baisse_min = last_candle['close'] - last_candle['min12']
baisse = 0
if profit > 0:
baisse = 1 - (profit / max_profit)
mx = max_profit / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount() + self.pairs[pair]['total_amount'])
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action=("🔴 NOW" if self.pairs[pair]['stop'] else "🟢 NOW "),
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='momentum' + str(round(momentum, 2)),
# profit=round(profit, 2),
# buys=count_of_buys,
# stake=0
# )
if self.pairs[pair]['current_trade'].enter_tag in ['bear', 'Force', 'Range-']:
if current_profit < - 0.02 and last_candle[f"close"] <= last_candle['sma60'] and self.wallets.get_available_stake_amount() < 50:
self.pairs[pair]['force_sell'] = True
return 'smaBF'
else:
if current_profit < self.sell_force_sell.value and self.wallets.get_available_stake_amount() < 50 \
and last_candle[f"close"] <= last_candle[self.sell_indicator.value]:
self.pairs[pair]['force_sell'] = True
return 'sma'
if current_profit > 0.01 and \
(baisse > self.baisse.value and last_candle[f"close"] <= last_candle[self.b30_indicateur.value]) \
and last_candle['hapercent'] < 0:
self.pairs[pair]['force_sell'] = True
return 'B30'
# if profit > 0 and baisse > 0.5 and last_candle['hapercent'] <0 and last_candle[f"close"] <= last_candle['sma12']:
# self.pairs[pair]['force_sell'] = True
# return 'B50'
if current_profit > - self.sell_force_sell.value and last_candle['has_cross_sma3_1h'] == 1:
self.pairs[pair]['force_sell'] = True
return 'Cross'
# if profit > max(5, expected_profit) and last_candle['sma5_deriv1_1h'] < 0 and baisse > 0.15:
# self.pairs[pair]['force_sell'] = True
# return 'Drv5d'
# if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 : #last_candle['cross_sma60']:
# self.pairs[pair]['force_sell'] = True
# return 'Range'
# if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 \
# and last_candle['sma5_deriv1_1h'] < 0 and last_candle['sma5_deriv2_1h'] < 0 \
# and last_candle['sma60_deriv1'] < 0 and last_candle['sma60_deriv2'] < 0:
# self.pairs[pair]['force_sell'] = True
# self.pairs[pair]['force_stop'] = True
# return 'Deriv';
if profit < - (dispo * 0.2):
print(f'dispo={dispo} wallet={round(self.wallets.get_available_stake_amount())} limit={-dispo * 0.1}')
self.pairs[pair]['force_sell'] = True
self.pairs[pair]['force_stop'] = True
return 'Force';
# if self.pairs[pair]['force_sell'] and self.pairs[pair]['last_trade'].close_profit > 0.02:
# return "Force"
# if last_candle['stop_buying']:
# self.pairs[pair]['force_sell'] = True
# return 'Stop'
# Si momentum fort → on laisse courir
# if momentum > 1:
# return None
# Si momentum faiblit → on prend profit plus tôt
# if momentum < 0 and current_profit > 0.02 and last_candle[self.sell_score_indicator.value] < before_last_candle[self.sell_score_indicator.value]\
# and last_candle['close'] < last_candle['sma5']:
# self.pairs[pair]['last_profit'] = current_profit
# return "momentum_drop"
return None
def custom_stoploss(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
profit = trade.calc_profit(current_rate)
candle_at_buy = self.pairs[pair]['last_candle']
# if candle_at_buy['range_pos'] > self.range_pos_stoploss.value and candle_at_buy[self.stoploss_indicator.value] < 0:
# return self.stoploss_force.value
# # print(f'test stop loss {self.stoploss} {last_candle["stop_buying12_1h"]}')
# if last_candle[self.stoploss_indicator.value] and (trade.nr_of_successful_entries >= 4 or self.wallets.get_available_stake_amount() < 300): # and profit < - 30 :
# range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1h"]
# range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1h"]
#
# if range_max == range_min:
# print(f'ranges={range_min}')
# return -0.1 # sécurité
#
# range_pos = (current_rate - range_min) / (range_max - range_min)
#
# if (range_pos > 1):
# return -1
#
# sl_min = -0.02
# sl_max = -0.1 #self.stoploss
#
# dynamic_sl = (sl_min + (1 - range_pos) * (sl_max - sl_min))
#
# print(f'{current_time} Loss ranges={round(range_min,0)} {round(range_max, 0)} range_pos={round(range_pos, 3)} dynamic_sl={round(dynamic_sl, 3)} '
# f'profit={profit} current={current_profit} {self.stoploss_indicator.value} {self.stoploss_timeperiod.value} {last_candle[self.stoploss_indicator.value]}')
#
# return dynamic_sl
return -1
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1h') for pair in pairs]
informative_pairs += [(pair, '1d') for pair in pairs]
return informative_pairs
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
dataframe = dataframe.copy()
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
dataframe['zero'] = 0
dataframe[f"percent"] = dataframe['close'].pct_change()
for timeperiod in timeperiods:
dataframe[f"mid_smooth{timeperiod}"] = dataframe['mid'].rolling(timeperiod).mean()
dataframe[f'max{timeperiod}'] = talib.MAX(dataframe['close'], timeperiod=timeperiod)
dataframe[f'min{timeperiod}'] = talib.MIN(dataframe['close'], timeperiod=timeperiod)
dataframe[f"percent{timeperiod}"] = dataframe['close'].pct_change(timeperiod)
dataframe[f"sma{timeperiod}"] = dataframe['mid'].ewm(span=timeperiod, adjust=False).mean()
# dataframe[f"high{timeperiod}"] = dataframe['high'].ewm(span=timeperiod, adjust=False).mean()
# dataframe[f"low{timeperiod}"] = dataframe['low'].ewm(span=timeperiod, adjust=False).mean()
# dataframe = self.calculateRegression(dataframe, column=f"high{timeperiod}", window=10, degree=1, future_offset=12)
# dataframe = self.calculateRegression(dataframe, column=f"low{timeperiod}", window=10, degree=1, future_offset=12)
self.calculeDerivees(dataframe, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
dataframe = self.calculateRegression(dataframe, column='mid', window=30, degree=1, future_offset=12)
dataframe = self.calculateRegression(dataframe, column='sma24', window=30, degree=1, future_offset=12)
dataframe["percent"] = dataframe["mid"].pct_change(1)
dataframe["percent3"] = dataframe["mid"].pct_change(3)
dataframe["volume_mean"] = dataframe["volume"].rolling(20).mean()
dataframe["volume_ratio"] = dataframe["volume"] / dataframe["volume_mean"]
# dataframe["market_state"] = 0
#
# dataframe.loc[dataframe["percent"] < -0.005, "market_state"] = -1
# dataframe.loc[(dataframe["percent3"] < -0.015) & (dataframe["volume_ratio"] > 2), "market_state"] = -2
# dataframe.loc[(dataframe["percent"] > 0.003) & (dataframe["volume_ratio"] > 1.5), "market_state"] = 1
# dataframe["velocity"] = dataframe["percent"] - dataframe["percent3"]
dataframe = self.calculateMarketState(dataframe, metadata)
# ######################################################################################################
################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
# informative = self.populateDataframe(informative, timeframe='1d')
heikinashi = qtpylib.heikinashi(informative)
informative['haopen'] = heikinashi['open']
informative['haclose'] = heikinashi['close']
informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose']
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
for timeperiod in timeperiods:
informative[f'max{timeperiod}'] = talib.MAX(informative['close'], timeperiod=timeperiod)
informative[f'min{timeperiod}'] = talib.MIN(informative['close'], timeperiod=timeperiod)
# informative[f"range{timeperiod}"] = ((informative["close"] - informative[f'min{timeperiod}']) / (informative[f'max{timeperiod}'] - informative[f'min{timeperiod}']))
# informative[f"percent{timeperiod}"] = informative['close'].pct_change(timeperiod)
informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean()
self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
informative = self.calculateRegression(informative, column='mid', window=10, degree=1, future_offset=12)
informative = self.calculateRegression(informative, column='sma3', window=10, degree=1, future_offset=12)
informative = self.calculateRegression(informative, column='low', window=10, degree=1, future_offset=12)
for timeperiod in long_timeperiods:
informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean()
self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
informative['rsi'] = talib.RSI(informative['close'], timeperiod=14)
self.calculeDerivees(informative, f"rsi", timeframe=self.timeframe, ema_period=14)
informative['max_rsi_12'] = talib.MAX(informative['rsi'], timeperiod=12)
informative['max_rsi_24'] = talib.MAX(informative['rsi'], timeperiod=24)
informative[f'stop_buying_deb'] = qtpylib.crossed_below(informative[f"sma12"], informative['sma36']) & (informative['close'] < informative['sma100'])
informative[f'stop_buying_end'] = qtpylib.crossed_above(informative[f"sma12"], informative['sma36']) & (informative['close'] > informative['sma100'])
latched = np.zeros(len(informative), dtype=bool)
for i in range(1, len(informative)):
if informative['stop_buying_deb'].iloc[i]:
latched[i] = True
elif informative['stop_buying_end'].iloc[i]:
latched[i] = False
else:
latched[i] = latched[i - 1]
informative['stop_buying'] = latched
informative = self.calculateDownAndUp(informative, limit=0.0001)
informative = self.calculateMarketState(informative, metadata)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
# ######################################################################################################
# ######################################################################################################
################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
# informative = self.populateDataframe(informative, timeframe='1d')
# heikinashi = qtpylib.heikinashi(informative)
# informative['haopen'] = heikinashi['open']
# informative['haclose'] = heikinashi['close']
# informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose']
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
for timeperiod in timeperiods:
informative[f'max{timeperiod}'] = talib.MAX(informative['close'], timeperiod=timeperiod)
informative[f'min{timeperiod}'] = talib.MIN(informative['close'], timeperiod=timeperiod)
# informative[f"range{timeperiod}"] = ((informative["close"] - informative[f'min{timeperiod}']) / (informative[f'max{timeperiod}'] - informative[f'min{timeperiod}']))
# informative[f"percent{timeperiod}"] = informative['close'].pct_change(timeperiod)
informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean()
self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
for timeperiod in long_timeperiods:
informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean()
self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
informative['rsi'] = talib.RSI(informative['close'], timeperiod=14)
self.calculeDerivees(informative, f"rsi", timeframe=self.timeframe, ema_period=14)
informative['max_rsi_12'] = talib.MAX(informative['rsi'], timeperiod=12)
informative['max_rsi_24'] = talib.MAX(informative['rsi'], timeperiod=24)
informative[f'stop_buying_deb'] = qtpylib.crossed_below(informative[f"sma12"], informative['sma36']) & (informative['close'] < informative['sma100'])
informative[f'stop_buying_end'] = qtpylib.crossed_above(informative[f"sma12"], informative['sma36']) & (informative['close'] > informative['sma100'])
latched = np.zeros(len(informative), dtype=bool)
for i in range(1, len(informative)):
if informative['stop_buying_deb'].iloc[i]:
latched[i] = True
elif informative['stop_buying_end'].iloc[i]:
latched[i] = False
else:
latched[i] = latched[i - 1]
informative['stop_buying'] = latched
informative = self.calculateMarketState(informative, metadata)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
# ######################################################################################################
range_min = dataframe[f"min12_1h"]
range_max = dataframe[f"max48"]
dataframe[f"range_pos"] = ((dataframe['mid'] - range_min) / (range_max)).rolling(5).mean()
# dataframe['cross_sma60'] = qtpylib.crossed_below(dataframe[self.sell_sma_indicators.value], dataframe[self.sell_crossed_sma_indicators.value])
dataframe[f'has_cross_sma3_1h'] = qtpylib.crossed_above(dataframe[f"sma60"], dataframe['sma3_regression_1h'])
dataframe[f'has_cross_min'] = qtpylib.crossed_above(dataframe[f"close"], dataframe['min60'])
dataframe[f'has_cross_min_6'] = (dataframe['has_cross_min'].rolling(15).max() == 1)
dataframe['atr'] = talib.ATR(dataframe)
dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
dataframe["dist_sma200_1h"] = (
(dataframe["close_1h"] - dataframe["sma200_1h"])
/ dataframe["sma200_1h"]
)
# Compter les baisses / hausses consécutives
dataframe = self.calculateDownAndUp(dataframe, limit=0.0001)
# récupérer le dernier trade fermé
trades = Trade.get_trades_proxy(pair=pair,is_open=False)
if trades:
last_trade = trades[-1]
self.pairs[pair]['last_profit'] = last_trade.close_profit # ex: 0.12 = +12%
self.pairs[pair]['last_trade'] = last_trade
return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
conditions = list()
# #####################################################################################
# CA MONTE !!
# #####################################################################################
# conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv1_1h" ] > self.start_bull_deriv1.value)
# conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv2_1h"] > self.start_bull_deriv2.value)
# conditions.append(dataframe['sma12_deriv1'] > self.buy_deriv1_sma60.value)
# conditions.append(dataframe['sma5_deriv1_1h'] > self.buy_deriv1_sma5d.value)
# conditions.append(dataframe['sma12_deriv1_1h'] > self.buy_deriv1_sma12d.value)
# conditions.append(dataframe['sma36_deriv2'] > self.buy_deriv2_sma60.value)
# conditions.append(dataframe['sma5_deriv2_1h'] > self.buy_deriv2_sma5d.value)
# conditions.append(dataframe['sma12_deriv2_1h'] > self.buy_deriv2_sma12d.value)
conditions.append(dataframe['hapercent'] > 0)
# conditions.append(dataframe['percent12'] < 0.01)
# conditions.append(dataframe['percent5'] < 0.01)
# conditions.append(dataframe['max_rsi_24'] < 80)
# dynamic_rsi_threshold = 70 + 15 * np.tanh(dataframe["dist_sma200_1h"] * 5)
# conditions.append((dataframe['max_rsi_12_1h'] < dynamic_rsi_threshold))
# conditions.append(dataframe[f"close"] > dataframe['sma60'])
# conditions.append(((dataframe[f"range_pos"] < 0.05) ) | ((dataframe['sma12_deriv1'] > 0) & (dataframe['sma12_deriv2'] > 0)))
#
# conditions.append(
# (dataframe['close_1h'] > dataframe[f'sma{self.buy_longue.value}_1h'])
# | (dataframe['sma60_inv_1h'] == -1)
# )
# if conditions:
# dataframe.loc[
# reduce(lambda x, y: x & y, conditions),
# ['enter_long', 'enter_tag']
# ] = (1, 'bull')
# #####################################################################################
# conditions = list()
# conditions.append(dataframe['dist_sma200_1h'] < -0.05)
# conditions.append(dataframe['sma12_inv'] == 1)
# # conditions.append(dataframe['sma100_deriv1_1h'] > 0)
# if conditions:
# dataframe.loc[
# reduce(lambda x, y: x & y, conditions),
# ['enter_long', 'enter_tag']
# ] = (1, 'dist')
# #####################################################################################
# conditions = list()
# conditions.append(dataframe['close'] < dataframe['sma100_1h'])
# conditions.append(dataframe['mid_smooth12'] > dataframe['mid_smooth12'].shift(1))
# conditions.append(dataframe['sma100_deriv1_1h'] > 0)
# conditions.append(dataframe[f"range_pos"] < 0.01)
# if conditions:
# dataframe.loc[
# reduce(lambda x, y: x & y, conditions),
# ['enter_long', 'enter_tag']
# ] = (1, 'dist')
# #####################################################################################
# CA BAISSE !!
# "buy": {
# "buy_deriv1_sma12d": 0.0,
# "buy_deriv1_sma5d": 0.0,
# "buy_deriv1_sma60": 0.003,
# "buy_deriv2_sma12d": -0.03,
# "buy_deriv2_sma5d": 0.0,
# "buy_deriv2_sma60": -0.002,
# "buy_longue": 200,
# "buy_longue_derive": "sma160_deriv1_1h"
# },
# "protection": {
# "drop_from_last_entry": -0.03,
# "b30_indicateur": "sma3",
# "baisse": 0.31
# },
# #####################################################################################
conditions = list()
# conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv1_1h" ] < self.start_bull_deriv1.value)
# conditions.append(dataframe[f"{self.start_bull_indicator.value}_deriv2_1h"] < self.start_bull_deriv2.value)
# conditions.append(dataframe[f"{self.start_bear_indicator.value}_deriv1_1h" ] > self.start_bear_deriv1.value)
# conditions.append(dataframe[f"{self.start_bear_indicator.value}_deriv2_1h"] > self.start_bear_deriv2.value)
# conditions.append(dataframe['sma12_deriv1'] > self.buy_deriv1_sma60.value)
# conditions.append(dataframe['sma5_deriv1_1h'] > self.buy_deriv1_sma5d.value)
# conditions.append(dataframe['sma12_deriv1_1h'] > self.buy_deriv1_sma12d.value)
#
# # conditions.append(dataframe['sma12_deriv2'] > -0.002)
# # conditions.append(dataframe['sma5_deriv2_1h'] > 0)
# # conditions.append(dataframe['sma12_deriv2_1h'] > -0.03)
#
# conditions.append(dataframe['hapercent'] > 0)
# # conditions.append(dataframe['percent12'] < 0.01)
# # conditions.append(dataframe['percent5'] < 0.01)
# conditions.append(dataframe['max_rsi_24'] < 80)
#
# dynamic_rsi_threshold = 70 + 15 * np.tanh(dataframe["dist_sma200_1h"] * 5)
# conditions.append((dataframe['max_rsi_12_1h'] < dynamic_rsi_threshold))
# conditions.append(dataframe[f"close"] > dataframe['sma60'])
# conditions.append(((dataframe[f"range_pos"] < 0.05) ) | ((dataframe['sma12_deriv1'] > 0) & (dataframe['sma12_deriv2'] > 0)))
#
# conditions.append(
# (dataframe['close_1h'] > dataframe[f'sma{self.buy_longue.value}_1h'])
# | (dataframe['sma60_inv_1h'] == -1)
# )
# if conditions:
# dataframe.loc[
# reduce(lambda x, y: x & y, conditions),
# ['enter_long', 'enter_tag']
# ] = (1, 'bear')
# conditions = list()
# conditions.append(dataframe['close_1h'] < dataframe[f'sma{self.buy_longue.value}_1h'])
# conditions.append(dataframe['has_cross_min'].rolling(6).max() == 1)
# conditions.append(dataframe['mid_smooth5'] > dataframe['mid_smooth5'].shift(1))
# conditions.append(dataframe['hapercent'] > 0)
# conditions.append(dataframe['percent5'] < 0.01)
# dataframe.loc[
# reduce(lambda x, y: x & y, conditions),
# ['enter_long', 'enter_tag']
# ] = (1, 'cross_min')
conditions = list()
conditions.append(dataframe['sma12_deriv1'] > 0.00)
conditions.append(dataframe['sma60_deriv1'] > 0.0)
conditions.append(dataframe['sma5_deriv1_1h'] > 0.0)
conditions.append(dataframe['sma12_deriv1_1h'] > 0.0)
conditions.append(dataframe['sma24_deriv1_1h'] > 0.0)
conditions.append(dataframe['sma100_deriv1_1h'] > 0.0)
conditions.append(dataframe[f"range_pos"] < 0.025)
# conditions.append(dataframe['sma12_deriv1_1h'] > 0.0)
# # conditions.append(dataframe['close_1h'] < dataframe[f'sma{self.buy_longue.value}_1h'])
# # conditions.append(dataframe['has_cross_min'].rolling(6).max() == 1)
# # conditions.append(dataframe['mid_smooth5'] > dataframe['mid_smooth5'].shift(1))
# conditions.append(dataframe['min12'] == dataframe['min12'].shift(3))
# conditions.append((dataframe['percent24'] < -0.025) | (dataframe['percent12'] < -0.025))
# dataframe.loc[
# reduce(lambda x, y: x & y, conditions),
# ['enter_long', 'enter_tag']
# ] = (1, 'Rise')
# conditions = list()
# conditions.append(dataframe['has_cross_min_6'] == True)
# conditions.append(dataframe['min36'] == dataframe['min36'].shift(3))
# dataframe.loc[
# reduce(lambda x, y: x & y, conditions),
# ['enter_long', 'enter_tag']
# ] = (1, 'Force')
conditions = list()
# conditions.append(dataframe['mid_regression'].shift(2) > dataframe['mid_regression'].shift(1))
# conditions.append(dataframe['mid_regression'].shift(1) < dataframe['mid_regression'])
conditions.append(dataframe['close'] <= dataframe['min3_1h'])
# conditions.append(dataframe['min60'] == dataframe['min60'].shift(5))
conditions.append(dataframe['has_cross_min_6'] == 1)
# conditions.append(dataframe['down_count'] <= 5)
# conditions.append(dataframe['sma12_deriv1'] >= 0)
dataframe.loc[
reduce(lambda x, y: x & y, conditions),
['enter_long', 'enter_tag']
] = (1, 'Mid')
dataframe.loc[
(
(dataframe['rsi'] > 70) # surachat
),
'enter_short'
] = 1
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
conditions = list()
# # TODO: Its not dry code!
# sell_indicator = self.sell_indicator0.value
# sell_crossed_indicator = self.sell_crossed_indicator0.value
# sell_operator = self.sell_operator0.value
# sell_real_num = self.sell_real_num0.value
# condition, dataframe = condition_generator(
# dataframe,
# sell_operator,
# sell_indicator,
# sell_crossed_indicator,
# sell_real_num
# )
# conditions.append(condition)
#
# sell_indicator = self.sell_indicator1.value
# sell_crossed_indicator = self.sell_crossed_indicator1.value
# sell_operator = self.sell_operator1.value
# sell_real_num = self.sell_real_num1.value
# condition, dataframe = condition_generator(
# dataframe,
# sell_operator,
# sell_indicator,
# sell_crossed_indicator,
# sell_real_num
# )
# conditions.append(condition)
#
# sell_indicator = self.sell_indicator2.value
# sell_crossed_indicator = self.sell_crossed_indicator2.value
# sell_operator = self.sell_operator2.value
# sell_real_num = self.sell_real_num2.value
# condition, dataframe = condition_generator(
# dataframe,
# sell_operator,
# sell_indicator,
# sell_crossed_indicator,
# sell_real_num
# )
# conditions.append(condition)
#
#
# print(f"SELL indicators tested \n"
# f"{self.sell_indicator0.value} {self.sell_crossed_indicator0.value} {self.sell_operator0.value} {self.sell_real_num0.value} \n"
# f"{self.sell_indicator1.value} {self.sell_crossed_indicator1.value} {self.sell_operator1.value} {self.sell_real_num1.value} \n"
# f"{self.sell_indicator2.value} {self.sell_crossed_indicator2.value} {self.sell_operator2.value} {self.sell_real_num2.value} \n"
# )
#
#
# conditions.append(qtpylib.crossed_below(dataframe['mid'], dataframe['sma24']))
# conditions.append((dataframe['range_pos'] > 0.04))
#
# if conditions:
# dataframe.loc[reduce(lambda x, y: x & y, conditions), ['exit_long', 'exit_tag']] = (1, 'god')
return dataframe
def calculeDerivees(
self,
dataframe: pd.DataFrame,
name: str,
suffixe: str = '',
window: int = 100,
coef: float = 0.15,
ema_period: int = 10,
verbose: bool = True,
timeframe: str = '1m'
) -> pd.DataFrame:
"""
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
avec epsilon adaptatif basé sur rolling percentiles.
"""
d1_col = f"{name}{suffixe}_deriv1"
d2_col = f"{name}{suffixe}_deriv2"
tendency_col = f"{name}{suffixe}_state"
series = dataframe[f"{name}{suffixe}"]
d1 = series.diff()
d2 = d1.diff()
pmin = int(ema_period / 3)
cond_bas = (d1.rolling(pmin).mean() > d1.rolling(ema_period).mean())
cond_haut = (d1.rolling(pmin).mean() < d1.rolling(ema_period).mean())
dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3)
dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1))
dataframe[f"{name}{suffixe}_inv"] = np.where(cond_bas, -1, np.where(cond_haut, 1, 0))
short = d1.rolling(pmin).mean()
long = d1.rolling(ema_period).mean()
spread = short - long
zscore = (spread - spread.rolling(ema_period).mean()) / spread.rolling(ema_period).std()
dataframe[f"{name}{suffixe}_score"] = zscore
# ####################################################################
# Calcul de la pente lissée
# d1 = series.diff()
# d1_smooth = d1.rolling(5).mean()
# # Normalisation
# z = (d1_smooth - d1_smooth.rolling(ema_period).mean()) / d1_smooth.rolling(ema_period).std()
# dataframe[f"{name}{suffixe}_trend_up"] = (
# (d1_smooth.shift(1) < 0) &
# (d1_smooth > 0) &
# (z > 1.0)
# )
#
# dataframe[f"{name}{suffixe}_trend_down"] = (
# (d1_smooth.shift(1) > 0) &
# (d1_smooth < 0) &
# (z < -1.0)
# )
#
# momentum_short = d1.rolling(int(ema_period / 2)).mean()
# momentum_long = d1.rolling(ema_period * 2).mean()
#
# dataframe[f"{name}{suffixe}_trend_change_up"] = (
# (momentum_short.shift(1) < momentum_long.shift(1)) &
# (momentum_short > momentum_long)
# )
#
# dataframe[f"{name}{suffixe}_trend_change_down"] = (
# (momentum_short.shift(1) > momentum_long.shift(1)) &
# (momentum_short < momentum_long)
# )
return dataframe
@property
def protections(self):
return [
{
"method": "CooldownPeriod",
"stop_duration_candles": 6
},
# {
# "method": "MaxDrawdown",
# "lookback_period_candles": 96,
# "trade_limit": 4,
# "max_allowed_drawdown": 0.1,
# "stop_duration_candles": 24
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 96,
# "trade_limit": 2,
# "stop_duration_candles": 48,
# "only_per_pair": False
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 6,
# "trade_limit": 2,
# "stop_duration_candles": 60,
# "required_profit": 0.02
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "required_profit": 0.01
# }
]
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return
if self.columns_logged % 10 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |"
f"{'rsi':>6}|"
)
self.printLineLog()
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
# colonnes_a_exclure = ['last_candle',
# 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# print(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1h'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1h'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1h'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1h'])) + " " + str(
# int(last_candle['rsi_pct_1h'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1h = ''
sma5_1h = ''
sma5 = str(sma5_1h) + ' ' + str(sma5_1h)
last_lost = self.getLastLost(last_candle, pair)
if buys is None:
buys = ''
max_touch = '' # round(last_candle['max12_1h'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = self.getPctFirstBuy(pair, last_candle)
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
dist_max = self.getDistMax(last_candle, pair)
# if trade_type is not None:
# if np.isnan(last_candle['rsi_1h']):
# string = ' '
# else:
# string = (str(int(last_candle['rsi_1h']))) + " " + str(int(last_candle['rsi_deriv1_1h']))
# trade_type = trade_type \
# + " " + string \
# + " " + str(int(last_candle['rsi_1h'])) \
# + " " + str(int(last_candle['rsi_deriv1_1h']))
# val144 = self.getProbaHausse144(last_candle)
# val1h = self.getProbaHausse1h(last_candle)
color = GREEN if profit > 0 else RED
# color_sma24 = GREEN if last_candle['sma24_deriv1_1h'] > 0 else RED
# color_sma24_2 = GREEN if last_candle['sma24_deriv2_1h'] > 0 else RED
# color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1h'] > 0 else RED
# color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1h'] > 0 else RED
# color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED
# color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED
# color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED
# color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
self.pairs[pair]['last_max'], 3)
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
self.pairs[pair]['last_min'], 3)
profit = str(round(profit, 2)) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
self.printLog(
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
)
def printLineLog(self):
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1h|"
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
)
def printLog(self, str):
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return;
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def getLastLost(self, last_candle, pair):
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
return last_lost
def getDistMax(self, last_candle, pair):
mx = last_candle['max12_1h']
dist_max = round(100 * (mx - last_candle['close']) / mx, 0)
return dist_max
def getPctFirstBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
def getPctLastBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
def expectedProfit(self, pair: str, last_candle: DataFrame):
lim = 0.005
pct = 0.001
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
self.pairs[pair]['expected_profit'] = expected_profit
return expected_profit
def getLastClosedTrade(self, pair):
# récupérer le dernier trade fermé
trades = Trade.get_trades_proxy(pair=pair,is_open=False)
if trades:
last_trade = trades[-1]
self.pairs[pair]['last_profit'] = last_trade.close_profit # ex: 0.12 = +12%
self.pairs[pair]['last_trade'] = last_trade
# ✅ Première dérivée(variation ou pente)
# Positive: la courbe est croissante → tendance haussière.
# Négative: la courbe est décroissante → tendance baissière.
# Proche de 0: la courbe est plate → marché stable ou en transition.
#
# Applications:
# Détecter les points dinflexion(changement de tendance) quand elle sannule.\
# Analyser la vitesse dun mouvement(plus elle est forte, plus le mouvement est impulsif).
#
# ✅ Seconde dérivée(accélération ou concavité)
# Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse.
# Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse.
# Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements.
#
# Exemples:
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
#
# Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0.
# Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe.
def calculateRegression(self,
dataframe: DataFrame,
column='close',
window=50,
degree=3,
future_offset: int = 10 # projection à n bougies après
) -> DataFrame:
df = dataframe.copy()
regression_fit = []
regression_future_fit = []
regression_fit = []
regression_future_fit = []
for i in range(len(df)):
if i < window:
regression_fit.append(np.nan)
regression_future_fit.append(np.nan)
continue
# Fin de la fenêtre dapprentissage
end_index = i
start_index = i - window
y = df[column].iloc[start_index:end_index].values
# Si les données sont insuffisantes (juste par précaution)
if len(y) < window:
regression_fit.append(np.nan)
regression_future_fit.append(np.nan)
continue
# x centré pour meilleure stabilité numérique
x = np.linspace(-1, 1, window)
coeffs = np.polyfit(x, y, degree)
poly = np.poly1d(coeffs)
# Calcul point présent (dernier de la fenêtre)
x_now = x[-1]
regression_fit.append(poly(x_now))
# Calcul point futur, en ajustant si on dépasse la fin
remaining = len(df) - i - 1
effective_offset = min(future_offset, remaining)
x_future = x_now + (effective_offset / window) * 2 # respect du même pas
regression_future_fit.append(poly(x_future))
df[f"{column}_regression"] = regression_fit
# # 2. Dérivée première = différence entre deux bougies successives
# df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"],
# 4)
#
# # 3. Dérivée seconde = différence de la dérivée première
# df[f"{column}_regression_deriv2"] = round(
# 10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4)
#
# df[f"{column}_future_{future_offset}"] = regression_future_fit
# # 2. Dérivée première = différence entre deux bougies successives
# df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4)
#
# # 3. Dérivée seconde = différence de la dérivée première
# df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4)
return df
def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['mid_regression'] <= dataframe['mid_regression'].shift(1)
dataframe['up'] = dataframe['mid_regression'] >= dataframe['mid_regression'].shift(1)
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
return dataframe
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values
def calculateMarketState(self, dataframe, metadata):
dataframe['ema50'] = EMAIndicator(dataframe['close'], window=50).ema_indicator()
dataframe['ema200'] = EMAIndicator(dataframe['close'], window=200).ema_indicator()
# RSI
dataframe['rsi'] = RSIIndicator(dataframe['close'], window=14).rsi()
dataframe["market_state"] = 0
cond_bull = (
(dataframe["rsi"] > 55) &
(dataframe["ema50"] > dataframe["ema200"])
)
cond_bear = (
(dataframe["rsi"] < 45) &
(dataframe["ema50"] < dataframe["ema200"])
)
dataframe.loc[cond_bull, "market_state"] = 1
dataframe.loc[cond_bear, "market_state"] = -1
return dataframe