Files
Freqtrade/DecisionTreeStrategy.py
Jérôme Delacotte 8a1c69e53a Synchronise
2025-03-09 16:53:02 +01:00

1346 lines
64 KiB
Python

from datetime import timedelta, datetime
from freqtrade.strategy.interface import IStrategy
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
from pandas import DataFrame
from freqtrade.persistence import Trade
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import StandardScaler
import numpy as np
import talib.abstract as ta
import freqtrade.vendor.qtpylib.indicators as qtpylib
from typing import Optional, Union, Tuple
from scipy.signal import find_peaks
from typing import Any, Callable, Dict, List
import logging
import configparser
#
from sklearn.tree import export_text
from sklearn.tree import export_graphviz
from graphviz import Source
import matplotlib.pyplot as plt
logger = logging.getLogger(__name__)
class DecisionTreeStrategy(IStrategy):
plot_config = {
"main_plot": {
"enter_tag": {
"color": "#197260"
},
"max200": {
"color": "#3dde2c"
},
"min200": {
"color": "#3dde2c"
},
"sma5_1h": {
"color": "#ffb009"
},
'bb_upperband_1d': {
'color': 'cyan'
},
'bb_lowerband_1d': {
'color': 'cyan'
},
'bb_middleband_1d': {
'color': 'red'
}
},
"subplots": {
# "Volume": {
# "volume_change": {
# "color": "#d174dd"
# },
# 'volume_spike': {
# "color": "blue"
# },
# 'volume_mean': {
# "color": 'green'
# }
# },
"Rsi": {
"rsi": {
"color": "blue"
},
"rsi_1h": {
"color": "#c1b255"
},
"rsi_sma_1h": {
"color": "#9f0e43"
},
"rsi_change": {
"color": "green"
},
"max200_diff": {
"color": "red"
}
},
"Pct": {
"percent12": {
"color": "blue"
},
"sma20_pct": {
"color": "green"
},
"sma5_pct_1h": {
"color": "#6b09f4"
},
# 'sma5_down_count_1h': {
# 'color': 'blue'
# },
# 'sma5_up_count_1h': {
# 'color': 'red'
# },
# 'sma5_down_count_1d': {
# 'color': 'blue'
# },
# 'sma5_up_count_1d': {
# 'color': 'red'
# }
},
"Inversion": {
# 'inversion': {
# "color": "green"
# },
# 'inversion_1h': {
# "color": "blue"
# },
'trend_is_positive': {
"color": "red"
},
'trend_pente_1h': {
"color": "blue"
}
},
"DI": {
"DI+_1h": {
'color': "red"
},
'DI_diff': {
'color': 'blue'
},
'DI_diff_1h': {
'color': 'green'
}
}
}
}
protections = [
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 12,
# "trade_limit": 1,
# "stop_duration_candles": 6,
# "only_per_pair": True
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 12,
# "trade_limit": 2,
# "stop_duration_candles": 6,
# "only_per_pair": False
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 60,
# "trade_limit": 1,
# "stop_duration": 60,
# "required_profit": -0.05
# },
{
"method": "CooldownPeriod",
"stop_duration_candles": 24
}
]
# 20 20 40 60 100 160 260 420
# 50 50 100 300 500
# fibo = [1, 1, 2, 3, 5, 8, 13, 21]
# my fibo
# 50 50 50 100 100 150 200 250 350 450 600 1050
fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21, 28, 37, 49]
baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84]
# Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21
# Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050
# Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200
# baisse 1 2 3 5 7 10 14 19 26 35 47 63 84
# Configuration de base
minimal_roi = {"0": 10} # ROI minimum
stoploss = -1 # Stop-loss à 5%
timeframe = '5m' # Timeframe utilisée
features = ['rsi', 'max200_diff', 'adx', 'atr', 'atr_1h', 'volatility_1h', 'sma5', 'rsi_change', 'volume_change']
target = 'future_direction'
features_to_scale = ['volume_change'] # Excluez le RSI
# Regrouper toutes les informations dans un seul dictionnaire
pairs = {
pair: {
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
"last_buy": 0.0
}
for pair in ["BTC/USDT", "ETH/USDT", "DOGE/USDT", "DASH/USDT", "XRP/USDT", "SOL/USDT"]
}
close = 'close'
open = 'open'
startup_candle_count = 288
# threshold = DecimalParameter(-10, 10, decimals=1, default=0.5, space='buy')
percent48 = DecimalParameter(-0.05, 0.05, decimals=3, default=-0.025, space='buy', optimize=False)
# bb_width = DecimalParameter(0.00, 0.05, decimals=3, default=0.02, space='buy')
# sma_pct_min = DecimalParameter(-0.05, 0.05, decimals=2, default=0.0, space='buy')
# sma_pct_max = DecimalParameter(-0.05, 0.05, decimals=2, default=0.0, space='buy')
# volume_change_buy = IntParameter(1, 20, default=2, space='buy')
# volume_change_sell = IntParameter(1, 20, default=2, space='sell')
sma20_factor = DecimalParameter(0, 1, decimals=1, default=0.25, space='sell', optimize=False)
# percent_threshold = DecimalParameter(-0.1, 0.1, decimals=2, default=0, space='buy')
sma5_pct_1h_sell = DecimalParameter(-0.01, 0.01, decimals=3, default=0.01, space='sell', optimize=False)
# percent_1d_sell = DecimalParameter(0, 0.1, decimals=2, default=0.015, space='sell', optimize=False)
# percent_1d_buy = DecimalParameter(0, 0.1, decimals=2, default=0.02, space='buy', optimize=False)
percent_1d_loss_sell = DecimalParameter(0, 0.2, decimals=2, default=0.03, space='sell', optimize=False)
# inversion_diff_sell = DecimalParameter(-1, 0, decimals=1, default=-0.8, space='sell')
di_pente_5m_sell = IntParameter(-200, 0, default=-50, space='sell')
# first_stack_factor = IntParameter(1, 20, default=20, space='buy')
# bb_mid_pct_1d_stop = IntParameter(-100, 0, default=-50, space='buy', optimize=True)
# bb_mid_pct_1d_start = IntParameter(0, 100, default=50, space='buy', optimize=True)
# rsi_pct_1h_stop = IntParameter(-30, 0, default=-5, space='buy', optimize=True)
# rsi_pct_1h_start = IntParameter(0, 30, default=5, space='buy', optimize=True)
# percent24_1h_stop = IntParameter(-30, 0, default=-5, space='buy', optimize=True)
# percent24_1h_start = IntParameter(0, 30, default=5, space='buy', optimize=True)
# inversion_diff_buy = DecimalParameter(0.5, 1, decimals=2, default=0.8, space='buy', optimize=False)
di_pente_5m_buy = IntParameter(0, 200, default=50, space='buy')
protection_stake_amount = IntParameter(0, 1000, default=100, space='protection', optimize=False)
sma5_pct_1h_gain_buy = DecimalParameter(-0.01, 0.01, decimals=3, default=0.1, space='protection', optimize=False)
sma5_pct_1h_loss_buy = DecimalParameter(-0.01, 0.01, decimals=3, default=0.1, space='protection', optimize=False)
max200_diff = DecimalParameter(0.00, 0.05, decimals=3, default=0.025, space='protection', optimize=False)
di_pente_5m_stop = IntParameter(-100, 0, default=-50, space='protection')
di_pente_1h_stop = IntParameter(-100, 0, default=-50, space='protection')
di_pente_1d_stop = IntParameter(-100, 0, default=-50, space='protection')
di_pente_5m_start = IntParameter(0, 100, default=50, space='protection')
di_pente_1h_start = IntParameter(0, 100, default=50, space='protection')
di_pente_1d_start = IntParameter(0, 100, default=50, space='protection')
stop_buying = 0
# trailing stoploss hyperopt parameters
use_custom_stoploss = True
# Trailing stoploss
trailing_stop = True
trailing_stop_positive = 0.01
trailing_stop_positive_offset = 0.05
trailing_only_offset_is_reached = True
columns_logged = False
# hard stoploss profit
pHSL = DecimalParameter(-0.15, -0.040, default=-1, decimals=3, space='sell', load=True)
# # profit threshold 1, trigger point, SL_1 is used
# pPF_1 = DecimalParameter(0.008, 0.020, default=0.016, decimals=3, space='sell', load=True)
# pSL_1 = DecimalParameter(0.008, 0.020, default=0.011, decimals=3, space='sell', load=True)
#
# # profit threshold 2, SL_2 is used
# pPF_2 = DecimalParameter(0.040, 0.100, default=0.080, decimals=3, space='sell', load=True)
# pSL_2 = DecimalParameter(0.020, 0.070, default=0.040, decimals=3, space='sell', load=True)
position_adjustment_enable = True
# Example specific variables
max_entry_position_adjustment = 15
# This number is explained a bit further down
max_dca_multiplier = 5.5
# # This is called when placing the initial order (opening trade)
# def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
# proposed_stake: float, min_stake: float | None, max_stake: float,
# leverage: float, entry_tag: str | None, side: str,
# **kwargs) -> float:
#
# # We need to leave most of the funds for possible further DCA orders
# # This also applies to fixed stakes
# return proposed_stake / self.max_dca_multiplier
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
# print('entry_tag' + str(entry_tag))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
dispo = round(self.wallets.get_available_stake_amount())
stake_amount = self.calculate_stake(pair, rate, last_candle)
self.testStopBuying(last_candle)
if self.stop_buying:
# self.stop_buying = max(self.stop_buying, last_candle['close'])
self.log_trade(
last_candle=last_candle,
date=current_time,
action="STOP BUY",
pair=pair,
trade_type=f"{100 * last_candle['percent24_1h']:.0f} / {10000 * last_candle['bb_mid_pct_1d']:.0f} / {int(last_candle['rsi_pct_1h']):.0f}",
rate=rate,
dispo=dispo,
profit=0,
stake=round(stake_amount, 2)
)
else:
# if (self.stop_buying > 0) & ((last_candle['close'] - self.stop_buying) / last_candle['close'] > - 0):
# self.stop_buying = 0
self.log_trade(
last_candle=last_candle,
date=current_time,
action="START BUY",
pair=pair,
trade_type=f"{100 * last_candle['percent24_1h']:.0f} / {10000 * last_candle['bb_mid_pct_1d']:.0f} / {int(last_candle['rsi_pct_1h']):.0f}",
rate=rate,
dispo=dispo,
profit=0,
stake=round(stake_amount, 2)
)
allow_to_buy = not self.stop_buying
if allow_to_buy:
self.pairs[pair]['last_max'] = rate
self.pairs[pair]['max_touch'] = rate
self.pairs[pair]['last_buy'] = rate
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Buy",
pair=pair,
trade_type=entry_tag,
rate=rate,
dispo=dispo,
profit=0,
stake=round(stake_amount, 2)
)
# logger.info(f"Allow_to_buy {allow_to_buy} {pair} {current_time} Buy={entry_tag} rate={rate} dispo={dispo} rsi_1h={rsi_1h}")
return allow_to_buy
def testStopBuying(self, last_candle):
# trade_type = f"{100 * last_candle['percent24_1h']:.0f} / {10000 * last_candle['bb_mid_pct_1d']:.0f} / {int(last_candle['rsi_pct_1h']):.0f}",
# if self.stop_buying == 0:
# test_buying = (100 * last_candle['percent24_1h'] < self.percent24_1h_stop.value) | \
# ((int(10000 * last_candle['bb_mid_pct_1d']) < self.bb_mid_pct_1d_stop.value) & (int(last_candle['rsi_pct_1h']) < self.rsi_pct_1h_stop.value))
# if test_buying:
# self.stop_buying = max(self.stop_buying, last_candle['close'])
# else:
# test_buying = (100 * last_candle['percent24_1h'] > self.percent24_1h_start.value) & \
# ((int(10000 * last_candle['bb_mid_pct_1d']) > self.bb_mid_pct_1d_start.value) & (int(last_candle['rsi_pct_1h']) > self.rsi_pct_1h_start.value))
# if test_buying:
# self.stop_buying = 0
if self.stop_buying == 0:
test_buying = (last_candle['DI+_pente'] < self.di_pente_5m_stop.value) \
& (last_candle['DI+_pente_1h'] < self.di_pente_1h_stop.value) \
& (last_candle['DI+_pente_1d'] < self.di_pente_1d_stop.value)
if test_buying:
self.stop_buying = max(self.stop_buying, last_candle['close'])
else:
test_buying = (last_candle['DI+_pente'] > self.di_pente_5m_start.value) \
& (last_candle['DI+_pente_1h'] > self.di_pente_1h_start.value) \
& (last_candle['DI+_pente_1d'] > self.di_pente_1d_start.value)
if test_buying:
self.stop_buying = 0
return
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs) -> bool:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
allow_to_sell = (last_candle['percent'] < -0.00)
dispo = round(self.wallets.get_available_stake_amount())
if allow_to_sell:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Sell",
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2)
)
self.pairs[pair]['last_max'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_sell'] = rate
else:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Cancel",
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo
# profit=round(trade.calc_profit(rate, amount), 2),
)
ok = (allow_to_sell) | (exit_reason == 'force_exit')
return ok
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: float | None, max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs
) -> float | None | tuple[float | None, str | None]:
if trade.has_open_orders:
return None
# Obtain pair dataframe (just to show how to access it)
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
# Only buy when not actively falling price.
last_candle = dataframe.iloc[-1].squeeze()
previous_candle = dataframe.iloc[-2].squeeze()
previous_candle_12 = dataframe.iloc[-13].squeeze()
previous_candle_288 = dataframe.iloc[-289].squeeze()
# self.testStopBuying(last_candle)
# return None
if (len(dataframe) < 1) | (self.wallets.get_available_stake_amount() < 50) | (self.stop_buying > 0):
return None
count_of_buys, hours, days, first_price, last_price = self.getTradeInfos(current_time, trade)
dispo = round(self.wallets.get_available_stake_amount())
first_stack = self.calculate_stake(trade.pair, current_rate, last_candle)
# if last_candle['enter_tag'] == 'buy_upperclose_1d' and (hours > 1):
# if (count_of_buys <= 3):
# stake_amount = min(self.wallets.get_available_stake_amount(), self.calculate_stake(trade.pair, 0, last_candle))
# else:
# stake_amount = min(self.wallets.get_available_stake_amount(),
# count_of_buys / 3 * self.calculate_stake(trade.pair, 0, last_candle))
# # Take half of the profit at +5%
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="Loss BB -",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type='Decrease',
# profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
# buys=trade.nr_of_successful_entries,
# stake=round(stake_amount, 2)
# )
# self.pairs[trade.pair]['max_touch'] = current_rate
#
# return stake_amount, "upperclose_1d"
# self.last_max_by_pair[trade.pair] * (1 - self.percent_1d_loss_sell.value)) \
# & (last_candle['sma5_up_count_1h'] >= 1)
last_pct = (last_candle['close'] - last_price) / last_price
if ((last_pct <= -0.06) \
& (count_of_buys > 3) \
& (last_candle['DI_diff_1h'] >= -5)
& (hours > 1)
& (last_candle['close'] < last_candle['bb_middleband_1d'])
& (previous_candle_288['bb_middleband_1d'] <= last_candle['bb_middleband_1d'])
# & (last_candle['sma5_up_count_1h'] >= 1)
) | (
(last_pct <= -0.01 * count_of_buys) \
& (count_of_buys <= 3)
& (last_candle['DI_diff_1h'] >= -5)
& (hours > 1)
& (previous_candle_288['bb_middleband_1d'] <= last_candle['bb_middleband_1d'])
# & (last_candle['close'] < last_candle['bb_middleband_1d'])
# & (last_candle['sma5_up_count_1h'] >= 1)
):
# & (last_candle['enter_tag'] != ''):
if (count_of_buys <= 3):
stake_amount = min(self.wallets.get_available_stake_amount(), self.calculate_stake(trade.pair, 0, last_candle))
else:
stake_amount = min(self.wallets.get_available_stake_amount(),
count_of_buys / 3 * self.calculate_stake(trade.pair, 0, last_candle))
# Take half of the profit at +5%
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type='Decrease',
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['max_touch'] = current_rate
return stake_amount, "half_loss%"
# if current_profit < (-0.05 - (0.05 * trade.nr_of_successful_entries)) \
# and trade.nr_of_successful_entries >= 1 \
# and (last_candle['rsi_1h'] > previous_candle_12['rsi_1h']) \
# and (last_candle['rsi_1h'] > 40):
if (False) & (current_profit < - 0.01) & (count_of_buys <= 3) \
& (days >= 1) & (last_candle['DI+_1h'] > 10) \
& (last_candle['enter_tag'] != ''):
# & (last_candle['sma5_pct_1h'] * 100 > self.sma5_pct_1h_loss_buy.value):
# & (last_candle['max200_diff'] >= self.max200_diff.value):
# & (last_candle['rsi_1h'] > 50) \
# & (last_candle['sma5_pct_1h'] * 100 > self.sma5_pct_1h_loss_buy.value):
stake_amount = min(self.wallets.get_available_stake_amount(),
max(first_stack, first_stack * (- current_profit / 0.05))) # (trade.stake_amount / 2)
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss +",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type='Increase',
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_max'] = max(self.pairs[trade.pair]['last_max'], current_rate)
self.pairs[trade.pair]['max_touch'] = max(self.pairs[trade.pair]['max_touch'], current_rate)
return stake_amount, "increase"
# if (current_profit > -0.05) | (last_candle['rsi_1h'] < previous_candle_12['rsi_1h']) | (
# last_candle['rsi_1h'] < 40):
# return None
filled_entries = trade.select_filled_orders(trade.entry_side)
count_of_entries = trade.nr_of_successful_entries
# Allow up to 3 additional increasingly larger buys (4 in total)
# Initial buy is 1x
# If that falls to -5% profit, we buy 1.25x more, average profit should increase to roughly -2.2%
# If that falls down to -5% again, we buy 1.5x more
# If that falls once again down to -5%, we buy 1.75x more
# Total stake for this trade would be 1 + 1.25 + 1.5 + 1.75 = 5.5x of the initial allowed stake.
# That is why max_dca_multiplier is 5.5
# Hope you have a deep wallet!
try:
if (current_profit > 0.01) \
& (last_candle['close'] > first_price) \
& (last_candle['close'] >= last_price * 1.01) \
& ((hours >= 2) | (last_candle['rsi_change'] > 0.2)) \
& (last_candle['rsi_1h'] <= 55) \
& (count_of_buys <= self.max_entry_position_adjustment) \
& (last_candle['sma5_1h'] > previous_candle_12['sma5_1h']):
# This returns first order stake size
stake_amount = filled_entries[0].stake_amount_filled
# print(f"1 - {stake_amount}")
# This then calculates current safety order size
# stake_amount = min(self.wallets.get_available_stake_amount(), stake_amount * (
# 1 + (count_of_entries * (100 + 10 * count_of_buys) * last_candle['sma5_pct_1h'])))
stake_amount = min(self.wallets.get_available_stake_amount(),
self.calculate_stake(trade.pair, 0, last_candle))
# print(f"2 - {stake_amount}")
self.log_trade(
last_candle=last_candle,
date=current_time,
dispo=dispo,
action="Gain +",
rate=current_rate,
pair=trade.pair,
trade_type='Increase',
profit=round(current_profit, 2),
buys=count_of_entries,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_max'] = max(self.pairs[trade.pair]['last_max'], current_rate)
self.pairs[trade.pair]['max_touch'] = max(self.pairs[trade.pair]['max_touch'], current_rate)
return stake_amount, "inc_volume"
except Exception as exception:
print(exception)
return None
return None
def getTradeInfos(self, current_time, trade):
filled_buys = trade.select_filled_orders('buy')
count_of_buys = len(filled_buys)
first_price = filled_buys[0].price
days = 0
minutes = 0
hours = 0
last_price = first_price
mises=0
for buy in filled_buys:
minutes = (current_time - buy.order_date_utc).seconds / 60
hours = round(minutes / 60, 0)
days = (current_time - buy.order_date_utc).days
last_price = buy.price
mises += buy.amount * buy.price
self.pairs[trade.pair]['trade_info'] = {
"count_of_buys": count_of_buys,
"hours": hours,
"days": days,
"minutes": minutes,
"first_price": first_price,
"last_price": last_price,
"mises": mises
}
return count_of_buys, hours, days, first_price, last_price
def custom_exit(self, pair: str, trade: 'Trade', current_time: 'datetime', current_rate: float,
current_profit: float, **kwargs) -> 'Optional[Union[str, bool]]':
"""
Custom exit function for dynamic trade exits.
"""
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
# Obtenir les données actuelles pour cette paire
last_candle = dataframe.iloc[-1].squeeze()
previous_last_candle = dataframe.iloc[-2].squeeze()
count_of_buys, hours, days, first_price, last_price = self.getTradeInfos(current_time, trade)
days = (current_time - trade.open_date_utc).days
expected_profit = 2 * last_candle['atr_1h']
# current_profit = current_profit / count_of_buys
self.pairs[pair]['max_touch'] = max(current_rate, self.pairs[pair]['max_touch'])
max_percent = expected_profit * 0.75 # self.min_max_buys[pair]['profit'] / 5 # last_candle['bb_width'] / 3.5 # 0.005
max_profit = 0.015 # last_candle['bb_width'] * 3 / 4 # 0.015
# if (current_profit > 0.05) & (last_candle['volume_change_1h'] < - self.volume_change_sell.value * 100):
# return 'volume'
limit_sell = (last_candle['close'] - self.pairs[trade.pair]['max_touch']) / self.pairs[trade.pair]['max_touch']
# if (days > 1) \
# & (last_candle['percent'] < 0) \
# & (current_profit > 0.00) \
# & (previous_last_candle['rsi'] >= 65) \
# & (last_candle['volume_change'] < previous_last_candle['volume_change']):
# return f"rsi {last_candle['rsi_1h']:.0f}"
if (current_profit > 0.01) & (hours > 3) & (count_of_buys > 1) & (last_candle['percent12'] < 0) & \
(previous_last_candle['sma20'] > last_candle['sma20']) & \
(previous_last_candle['sma5_1h'] > last_candle['sma5_1h']) & \
(previous_last_candle['trend_is_positive'] > last_candle['trend_is_positive']):
return 'sell_tout_baisse'
# ((last_candle['sma5_up_count_1h'] > 5) & (limit_sell > -0.03))
if (last_candle['percent'] > 0) | (last_candle['percent3'] > 0.0) | (last_candle['percent5'] > 0.0)\
| (last_candle['close'] * 1.006 < last_candle['bb_upperband_1d']):
return None
self.testStopBuying(last_candle)
if (self.stop_buying > 0) & (limit_sell < -0.03) & (last_candle['DI+_1h'] < 10):
return f"stop_buying {limit_sell * 100:.0f}"
# if (current_profit > 0.000) & ((current_time - trade.open_date_utc).days >= 2):
# return f"too_old"
#
if (current_profit > 0.005) & (limit_sell < -current_profit * 0.75) & (last_candle['DI+_1h'] < 10):
return f"limit_1 {limit_sell:.3f}"
# if (current_profit > 0.005) & (limit_sell < - current_profit / 4) & (last_candle['percent12'] < 0) \
# & (last_candle['close'] > last_candle['bb_upperband_1d']):
# return f"limit_2 {limit_sell:.3f}"
if (current_profit >= expected_profit) \
& (last_candle['percent3'] < -max_percent) \
& (last_candle[self.close] > last_candle['sma5_1h']) \
& (last_candle['DI_diff'] < -20):
return f"quick_lost {last_candle['percent3'] * 100:.1f}"
if (False) & (current_profit > 0) & \
(limit_sell < -0.01) \
& ((current_time - trade.open_date_utc).seconds >= 3600) \
& (last_candle['DI_diff'] < 0) \
& (last_candle['sma5_pct_1h'] < -0):
return f"DI_diff {last_candle['DI_diff']:.0f} {limit_sell:.2f}"
# ((self.pairs[trade.pair]['last_max'] - last_candle['close']) / self.pairs[trade.pair]['last_max'] > 0.03):
# if (last_candle['DI+_pente_1d'] < 0) & (last_candle['DI+_pente_1h'] < 0) \
# & ((self.pairs[trade.pair]['last_max'] - last_candle['close']) / self.pairs[trade.pair]['last_max'] > 0.03):
# return f"DI {last_candle['DI+_pente_1d']:.1f} / {last_candle['DI+_pente_1h']:.1f}"
# if (last_candle['percent48'] <= -0.04) | (last_candle['percent24'] <= -0.04) | (last_candle['percent12'] <= -0.04) \
# & ((current_time - trade.open_date_utc).seconds >= 3600):
# return "quick_lost"
# if self.profit_b_sma20.value:
if (current_profit > expected_profit) \
& (previous_last_candle['sma10'] > last_candle['sma10']) \
& ((current_time - trade.open_date_utc).seconds >= 3600) \
& ((previous_last_candle['sma20'] > last_candle['sma20']) &
((last_candle['percent5'] < - current_profit * self.sma20_factor.value)
# | (last_candle['percent12'] < - current_profit * self.sma20_factor.value)
# | (last_candle['percent24'] < - current_profit * self.sma20_factor.value)
)) \
& (last_candle['sma5_pct_1h'] * 100 < self.sma5_pct_1h_sell.value):
# print("over_bb_band_sma10_desc", pair, trade, " profit=", current_profit, " rate=", current_rate)
return 'b_sma20'
# Par défaut, ne pas sortir
return None
def custom_stoploss(self, pair: str, trade: 'Trade', current_time: datetime,
current_rate: float, current_profit: float, **kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
# Obtenir les données actuelles pour cette paire
last_candle = dataframe.iloc[-1].squeeze()
# self.getTradeInfos(current_time, trade)
# print(f"current_profit={current_profit} mises=" + str(round(self.pairs[pair]['trade_info']['mises'], 4)))
limit_sell = (last_candle['close'] - self.pairs[trade.pair]['max_touch']) / self.pairs[trade.pair]['max_touch']
if (current_profit > 0.08) & (limit_sell < -0.01) & (last_candle['DI+_1h'] < 10):
sl_profit = 0.85 * current_profit # n% du profit en cours
else:
sl_profit = self.pHSL.value # Hard stop-loss
stoploss = stoploss_from_open(sl_profit, current_profit)
return stoploss
# def custom_stoploss(self, pair: str, trade: 'Trade', current_time: 'datetime',
# current_rate: float, current_profit: float, **kwargs) -> float:
# dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
#
# # Obtenir les données actuelles pour cette paire
# last_candle = dataframe.iloc[-1].squeeze()
# # hard stoploss profit
# HSL = self.pHSL.value
# PF_1 = self.pPF_1.value
# SL_1 = self.pSL_1.value
# PF_2 = self.pPF_2.value
# SL_2 = self.pSL_2.value
#
# # For profits between PF_1 and PF_2 the stoploss (sl_profit) used is linearly interpolated
# # between the values of SL_1 and SL_2. For all profits above PL_2 the sl_profit value
# # rises linearly with current profit, for profits below PF_1 the hard stoploss profit is used.
# if current_profit > PF_2:
# sl_profit = SL_2 + (current_profit - PF_2)
# elif current_profit > PF_1:
# sl_profit = SL_1 + ((current_profit - PF_1) * (SL_2 - SL_1) / (PF_2 - PF_1))
# else:
# sl_profit = HSL
#
# stoploss = stoploss_from_open(sl_profit, current_profit)
# #logger.info(f"stoploss={stoploss}")
# return stoploss
# Indicateurs personnalisés pour le DataFrame
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['enter_long'] = ""
dataframe['enter_tag'] = ""
dataframe['mid'] = (dataframe['close'] + dataframe['open']) / 2
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = dataframe['haclose'].pct_change()
# Ajout d'indicateurs techniques
dataframe['rsi'] = ta.RSI(dataframe, timeperiod=14)
dataframe['rsi_change'] = dataframe['rsi'].pct_change(3)
# dataframe['ema_short'] = ta.EMA(dataframe, timeperiod=9)
# dataframe['ema_long'] = ta.EMA(dataframe, timeperiod=21)
# dataframe['ema_50'] = ta.EMA(dataframe, timeperiod=50)
# dataframe['ema_200'] = ta.EMA(dataframe, timeperiod=200)
dataframe['bb_upperband'], dataframe['bb_middleband'], dataframe['bb_lowerband'] = ta.BBANDS(
dataframe['close'], timeperiod=20
)
dataframe["bb_width"] = (
(dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
)
dataframe['min200'] = ta.MIN(dataframe['close'], timeperiod=200)
dataframe['max200'] = ta.MAX(dataframe['close'], timeperiod=200)
dataframe['max50'] = ta.MAX(dataframe['close'], timeperiod=50)
dataframe['min50'] = ta.MIN(dataframe['close'], timeperiod=50)
dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close']
dataframe['max50_diff'] = (dataframe['max50'] - dataframe['close']) / dataframe['close']
# ADX pour la force de tendance
dataframe['adx'] = ta.ADX(dataframe)
dataframe['atr'] = ta.ATR(dataframe['high'], dataframe['low'], dataframe[self.close], timeperiod=144) / \
dataframe[self.close]
# Calcul de nouvelles colonnes pour l'entraînement du modèle
dataframe['percent'] = dataframe['close'].pct_change()
dataframe['futur_pct'] = dataframe['percent'] # dataframe['close'] - dataframe['close'].shift(1)
dataframe['sma5'] = ta.SMA(dataframe, timeperiod=5)
dataframe['sma10'] = ta.SMA(dataframe, timeperiod=10)
dataframe['sma20'] = ta.SMA(dataframe, timeperiod=20)
dataframe['Interpolated'] = dataframe['sma5'].ewm(span=3, adjust=False).mean()
dataframe["percent3"] = dataframe[self.close].pct_change(3)
dataframe["percent5"] = dataframe[self.close].pct_change(5)
dataframe["percent12"] = dataframe[self.close].pct_change(12)
dataframe["percent24"] = dataframe[self.close].pct_change(24)
dataframe["percent48"] = dataframe[self.close].pct_change(48)
dataframe["sma20_pct"] = dataframe['sma20'].pct_change()
dataframe['volume2'] = dataframe['volume']
dataframe.loc[dataframe['percent'] < 0, 'volume2'] *= -1
dataframe['volume_change'] = dataframe['volume2'].rolling(5).sum()
# # Charger les données (par exemple, dataframe avec une colonne "close")
# dataframe['SMA5'] = dataframe['mid'].rolling(window=5).mean()
#
# # Calcul de la variation de pente
# dataframe['SMA5_diff'] = dataframe['SMA5'].diff()
#
# # Calcul de la variation accélérée (différence de pente)
# dataframe['SMA5_diff2'] = dataframe['SMA5_diff'].diff()
#
# # Identifier les inversions brusques (exemple : seuil = 0.5)
# dataframe['inversion'] = ((dataframe['SMA5_diff'] * dataframe['SMA5_diff'].shift(-1) < 0) &
# (dataframe['SMA5_diff2'] > self.threshold.value))
# Ajoutez la colonne en utilisant .loc pour éviter l'avertissement
# dataframe.loc[:, 'future_direction'] = dataframe['close'].shift(-1) - dataframe['close']
# dataframe.loc[:, 'future_direction'] = dataframe['future_direction'].apply(lambda x: 1 if x > 0 else 0)
# dataframe['future_direction'] = dataframe['percent48'] > 0.02
# dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
# Supprime les lignes avec des valeurs manquantes
# dataframe = dataframe.dropna()
dataframe['DI+'] = ta.PLUS_DI(dataframe, window=12)
dataframe['DI-'] = ta.MINUS_DI(dataframe, window=12)
dataframe['DI_diff'] = dataframe['DI+'] - dataframe['DI-']
# Vérification de la tendance
dataframe['trend_is_positive'] = (dataframe['DI+'] - dataframe['DI-']) * (dataframe['adx'])
dataframe['trend_pente'] = dataframe['trend_is_positive'] - dataframe['trend_is_positive'].shift(1)
# Calcul de la pente de la SMA
dataframe['SMA20_slope'] = 100 * dataframe['sma20'].diff() / dataframe['sma20']
dataframe['trend_is_positive_2'] = dataframe['SMA20_slope'] > 0
dataframe['trend_is_negative_2'] = dataframe['SMA20_slope'] < 0
dataframe["volume_mean"] = dataframe["volume"].rolling(window=14).mean()
dataframe["volume_change_mean"] = dataframe["volume_change"].rolling(window=14).mean()
# Sortie si le volume est 3x supérieur à la moyenne
dataframe["volume_spike"] = dataframe["volume"] > (dataframe["volume_mean"] * 3)
# Déterminer si la bougie est haussière ou baissière
dataframe["bullish_candle"] = dataframe["close"] > dataframe["open"]
dataframe["bearish_candle"] = dataframe["close"] < dataframe["open"]
# Volume spike haussier (fort volume + bougie haussière)
dataframe["bullish_volume_spike"] = (dataframe["volume_change"] > dataframe["volume_change_mean"] * 3) & dataframe["volume_change"] > 0
# Volume spike baissier (fort volume + bougie baissière)
dataframe["bearish_volume_spike"] = (dataframe["volume_change"] < dataframe["volume_change_mean"] * 3) & dataframe["volume_change"] < 0
# ======================================================================================
################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
informative['sma5'] = ta.SMA(informative, timeperiod=5)
informative['sma5_pct'] = informative['sma5'].pct_change()
informative["percent"] = informative[self.close].pct_change()
informative['volume2'] = informative['volume']
informative.loc[dataframe['percent'] < 0, 'volume2'] *= -1
informative['volume_change'] = informative['volume2'].rolling(5).sum()
informative['volatility'] = ta.STDDEV(informative['close'], timeperiod=14) / informative['close']
informative['atr'] = (ta.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / \
informative['close']
informative['rsi'] = ta.RSI(informative['close'], timeperiod=12)
informative['rsi_sma'] = ta.SMA(informative['rsi'], timeperiod=5)
informative['rsi_pct'] = 100 * informative['rsi'].pct_change()
informative['percent24'] = informative['close'].pct_change(24)
# Détecter les baisses de SMA5 (True si SMA5 baisse, False sinon)
informative['sma5_down'] = informative['sma5'].diff() <= 0
informative['sma5_up'] = informative['sma5'].diff() >= 0
# Compter les baisses consécutives
informative['sma5_down_count'] = informative['sma5_down'].astype(int) * (
informative['sma5_down'].groupby((informative['sma5_down'] != informative['sma5_down'].shift()).cumsum()).cumcount() + 1)
informative['sma5_up_count'] = informative['sma5_up'].astype(int) * (
informative['sma5_up'].groupby((informative['sma5_up'] != informative['sma5_up'].shift()).cumsum()).cumcount() + 1)
informative['Interpolated'] = informative['sma5'].ewm(span=3, adjust=False).mean()
informative['inversion'] = (
(informative['Interpolated'].shift(2) > informative['Interpolated'].shift(
1)) # La pente devient positive (actuel)
& (informative['Interpolated'].shift(1) < informative['Interpolated'])
# & (data[f'SMA{sma_period}_diff2'] < data['threshold']) # Le changement est brusque
)
# Calcul de l'ADX
informative['adx'] = ta.ADX(informative)
informative['DI+'] = ta.PLUS_DI(informative, window=5)
informative['DI+_pente'] = 100 * informative['DI+'].diff(3) / informative['DI+']
informative['DI-'] = ta.MINUS_DI(informative, window=5)
informative['DI-_pente'] = 100 * informative['DI-'].diff(3) / informative['DI-']
informative['DI_diff'] = informative['DI+'] - informative['DI-']
# Vérification de la tendance
informative['trend_is_positive'] = (informative['DI+'] - informative['DI-']) * (informative['adx'])
informative['trend_pente'] = informative['trend_is_positive'] - informative['trend_is_positive'].shift(1)
# Calcul de la pente de la SMA
informative['SMA5_slope'] = 100 * informative['sma5'].diff() / informative['sma5']
informative['trend_is_positive_2'] = informative['SMA5_slope'] > 0
informative['trend_is_negative_2'] = informative['SMA5_slope'] < 0
informative['inversion_slope'] = (
(informative['SMA5_slope'].shift(2) > informative['SMA5_slope'].shift(
1)) # La pente devient positive (actuel)
& (informative['SMA5_slope'].shift(1) < informative['SMA5_slope'])
# & (data[f'SMA{sma_period}_diff2'] < data['threshold']) # Le changement est brusque
)
# heikinashi = qtpylib.heikinashi(informative)
# informative['ha_open'] = heikinashi['open']
# informative['ha_close'] = heikinashi['close']
# informative['ha_high'] = heikinashi['high']
# informative['ha_low'] = heikinashi['low']
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
# ======================================================================================
################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
informative['rsi'] = ta.RSI(informative['close'], timeperiod=12)
informative['sma5'] = ta.SMA(informative, timeperiod=3)
informative['sma5_pct'] = informative['sma5'].pct_change()
informative['rsi_pct'] = 100 * informative['rsi'].pct_change()
informative['bb_upperband'], informative['bb_middleband'], informative['bb_lowerband'] = ta.BBANDS(
informative['close'], timeperiod=20
)
informative["bb_mid_pct"] = informative['bb_middleband'].pct_change()
# Calcul de l'ADX
informative['adx'] = ta.ADX(informative)
informative['DI+'] = ta.PLUS_DI(informative, window=5)
informative['DI+_pente'] = 100 * informative['DI+'].diff(3) / informative['DI+']
informative['DI-'] = ta.MINUS_DI(informative, window=5)
informative['DI-_pente'] = 100 * informative['DI-'].diff(3) / informative['DI-']
# Vérification de la tendance
informative['trend_is_positive'] = (informative['DI+'] - informative['DI-']) * (informative['adx'])
# Calcul de la pente de la SMA
informative['SMA5_slope'] = 100 * informative['sma5'].diff() / informative['sma5']
informative['trend_is_positive_2'] = informative['SMA5_slope'] > 0
informative['trend_is_negative_2'] = informative['SMA5_slope'] < 0
informative['atr'] = (ta.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / \
informative['close']
informative['sma5_down'] = informative['sma5'].diff() <= 0
informative['sma5_up'] = informative['sma5'].diff() >= 0
informative['sma5_down_count'] = informative['sma5_down'].astype(int) * (
informative['sma5_down'].groupby((informative['sma5_down'] != informative['sma5_down'].shift()).cumsum()).cumcount() + 1)
informative['sma5_up_count'] = informative['sma5_up'].astype(int) * (
informative['sma5_up'].groupby((informative['sma5_up'] != informative['sma5_up'].shift()).cumsum()).cumcount() + 1)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
# dataframe = self.detect_sma_inversions(dataframe, column='mid', sma_period=5, threshold_factor=self.threshold.value)
dataframe['DI+'] = ta.PLUS_DI(dataframe, window=14)
dataframe['DI+_pente'] = 100 * dataframe['DI+'].diff(3) / dataframe['DI+']
dataframe['DI-'] = ta.MINUS_DI(dataframe, window=14)
dataframe['DI-_pente'] = 100 * dataframe['DI-'].diff(3) / dataframe['DI-']
dataframe['sma20_5_diff'] = 100 * (dataframe['sma5'].shift(7) - dataframe['sma5'].shift(2)) / dataframe[
'sma5'].shift(2)
dataframe['inversion'] = (
(dataframe['Interpolated'].shift(2) >= dataframe['Interpolated'].shift(1)) # La pente devient positive (actuel)
& (dataframe['Interpolated'].shift(1) <= dataframe['Interpolated'])
# Le changement est brusque
)
dataframe['inversion_s'] = (
(dataframe['Interpolated'].shift(2) <= dataframe['Interpolated'].shift(1)) # La pente devient positive (actuel)
& (dataframe['Interpolated'].shift(1) >= dataframe['Interpolated'])
)
dataframe = self.populate_future_direction(dataframe)
return dataframe
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialisation du modèle d'arbre de décision
self.model = DecisionTreeClassifier(max_depth=5, random_state=42)
self.scaler = StandardScaler() # Pour normaliser les données
def train_model(self, dataframe: DataFrame) -> None:
# Supprimez les valeurs infinies et NaN
dataframe.replace([np.inf, -np.inf], np.nan, inplace=True)
# Supprimez les lignes avec des NaN dans les colonnes spécifiques
train_data = dataframe.dropna(subset=self.features + [self.target])
if train_data.empty:
raise ValueError("Aucune donnée valide après suppression des NaN et infinis. Vérifiez vos calculs.")
X = train_data[self.features]
y = train_data[self.target]
# Ajustez le scaler et le modèle
self.scaler.fit(X)
# dataframe[features_to_scale] = self.scaler.fit_transform(dataframe[self.features_to_scale])
self.model.fit(self.scaler.transform(X), y)
def validate_data(self, X):
if np.any(np.isnan(X)):
raise ValueError("Les données contiennent des valeurs NaN.")
if np.any(np.isinf(X)):
raise ValueError("Les données contiennent des valeurs infinies.")
def predict(self, dataframe: DataFrame) -> np.ndarray:
predict_data = dataframe.dropna(subset=self.features)
if predict_data.empty:
return np.array([])
X = predict_data[self.features]
# Vérifiez que le scaler est ajusté
if not hasattr(self.scaler, 'mean_'):
raise ValueError("Scaler not fitted. Call `train_model` before predicting.")
X_scaled = self.scaler.transform(X)
return self.model.predict(X_scaled)
def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe = self.populate_future_direction(dataframe)
# # Appeler train_model pour ajuster scaler et modèle
#
# #if self.model is None:
# self.train_model(dataframe)
#
# predictions = self.predict(dataframe)
# dataframe['buy_signal'] = 0
# dataframe.loc[dataframe.index[:len(predictions)], 'buy_signal'] = predictions
# dataframe.loc[(dataframe['buy_signal'] == 1),
# ['enter_long', 'enter_tag']
# ] = [1 , 'buy_predict']
# dataframe.loc[
# (dataframe['inversion'] == 1)
# # & (dataframe['close'] < dataframe['min50'] * 1.002)
# & (dataframe['DI+'] > 0)
# # & (dataframe['DI+'] - dataframe['DI+'].shift(1) > 0)
# & (dataframe["sma20"] > dataframe["sma20"].shift(1))
# ,
# ['enter_long', 'enter_tag']] = [1, 'buy_inversion']
dataframe.loc[
(dataframe['percent3'] > 0) &
((dataframe['bb_upperband_1d'].shift(10) - dataframe['close'].shift(10)) / dataframe['close'].shift(10) > 0.05) &
(dataframe['close'].shift(10) < dataframe['bb_lowerband_1d'].shift(10) * 1.007) &
(dataframe['min50'].shift(10) == dataframe['min50']),
['enter_long', 'enter_tag']] = [1, 'buy_upperclose_1d']
dataframe.loc[
(dataframe['percent3'] > 0) &
(dataframe['close'] < dataframe['bb_upperband_1d']) &
(dataframe['sma20'].shift(1) < dataframe['sma20']) &
(dataframe['sma5_1h'].shift(1) < dataframe['sma5_1h']) &
(dataframe['trend_is_positive'].shift(1) < dataframe['trend_is_positive']),
['enter_long', 'enter_tag']] = [1, 'buy_tout_monte']
# dataframe.loc[
# (dataframe['inversion'] == 1)
# & (dataframe['max50_diff'].shift(1) >= 0.035)
# & (dataframe['DI_diff'] > - 30),
# ['enter_long', 'enter_tag']] = [1, 'buy_inversion_2']
# dataframe.loc[
# # (dataframe['inversion'] == 1)
# # & (dataframe['atr'] >= 0.001)
# # (dataframe['volume_change_1h'] > self.volume_change_buy.value * 100)
# (dataframe["close"] == dataframe['min200'])
# & (
# (dataframe['percent12'] <= self.percent48.value)
# | (dataframe['percent24'] <= self.percent48.value)
# | (dataframe['percent12'] <= self.percent48.value)
# ),
# #& (self.sma_pct_min.value < dataframe["sma20_pct"] * 100)
# #& (dataframe["sma20_pct"] * 100 > self.sma_pct_max.value)
# # & (dataframe['bb_width'] >= self.bb_width.value)
# #& (dataframe['max200_diff'] >= self.max200_diff.value),
# ['enter_long', 'enter_tag']
# ] = [1, 'buy_inversion']
# Supposons que `model` est votre modèle DecisionTreeClassifier ou DecisionTreeRegressor
# tree_rules = export_text(self.model, feature_names=self.features) # `feature_names` est la liste des noms de vos colonnes
# print(tree_rules)
# # Exporter l'arbre au format .dot
# export_graphviz(
# self.model,
# out_file="tree.dot",
# feature_names=self.features,
# #class_names=self.target, # Facultatif
# filled=True,
# rounded=True
# )
#
# # Charger et visualiser le fichier .dot
# with open("tree.dot") as f:
# dot_graph = f.read()
# graph = Source(dot_graph)
# graph.render("decision_tree " + metadata['pair']) # Génère un fichier PNG ou PDF
# graph.view()
# plt.figure(figsize=(12, 6))
# plt.plot(dataframe['close'], label='Close Price', alpha=0.5)
# plt.plot(dataframe['SMA5'], label='SMA5', linewidth=2)
# inversions = dataframe[dataframe['inversion']]
# # Marquer les inversions
# plt.scatter(inversions.index, inversions['SMA5'], color='red', label='Inversions brusques' + metadata['pair'])
# plt.legend()
# plt.show()
return dataframe
def populate_exit_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Définit les signaux de sortie.
"""
# Sort lorsque le prix ferme sous la bande inférieure des Bollinger
# dataframe.loc[
# (dataframe['close'] > dataframe['bb_upperband']),
# 'sell'
# ] = 1
# dataframe.loc[
# (dataframe['inversion_s'] == 1)
# & (dataframe['sma20_5_diff'] < self.inversion_diff_sell.value),
# ['exit_long', 'exit_tag']] = [1 , 'sell_inversion']
return dataframe
def populate_future_direction(self, dataframe: DataFrame) -> DataFrame:
"""
Ajoute une colonne `future_direction` pour indiquer si le prix augmente (1) ou diminue (0).
"""
dataframe['future_change'] = dataframe['percent12']
dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0.005 else 0)
return dataframe
def backtest(self, dataframe: DataFrame) -> None:
"""
Exemple de test du modèle sur des données historiques.
"""
dataframe = self.populate_future_direction(dataframe)
self.train_model(dataframe)
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
return self.calculate_stake(pair, None, None)
def calculate_stake(self, pair, value, last_candle):
amount = self.config['stake_amount'] #1000 / self.first_stack_factor.value self.protection_stake_amount.value #
return amount
# def add_future_direction(self, dataframe: DataFrame) -> DataFrame:
# dataframe['future_change'] = dataframe['percent48']
# dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
# return dataframe
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
# # Fonction générique pour détecter les inversions brusques d'une SMA
# def detect_sma_inversions(self, data, column='close', sma_period=20, threshold_factor=2):
# """
# Détecte les inversions brusques pour une SMA donnée.
#
# :param data: DataFrame contenant les données de prix.
# :param column: Nom de la colonne des prix (ex. 'close').
# :param sma_period: Période de la SMA (ex. 20 pour SMA20).
# :param threshold_factor: Facteur pour calculer le seuil basé sur l'écart-type.
# :return: DataFrame avec une colonne 'inversion' indiquant les points d'inversion.
# """
# # Calcul de la SMA
# data[f'SMA{sma_period}'] = data[column].rolling(window=sma_period).mean()
#
# # Calcul de la variation de la pente (différences successives)
# data[f'SMA{sma_period}_diff'] = 100 * data[f'SMA{sma_period}'].diff() / data['close']
#
# # Calcul de la variation accélérée (différence des différences)
# data[f'SMA{sma_period}_diff2'] = \
# (data[f'SMA{sma_period}_diff'].shift(3) + data[f'SMA{sma_period}_diff'].shift(2) + data[
# f'SMA{sma_period}_diff'].shift(1)) / 3 - (data[f'SMA{sma_period}_diff'].shift(2) + data[f'SMA{sma_period}_diff'].shift(1) + data[f'SMA{sma_period}_diff']) / 3
#
# # Calcul de l'écart-type pour normaliser le seuil
# # std_diff2 = data[f'SMA{sma_period}_diff2'].std()
#
# # Définir un seuil basé sur le facteur et l'écart-type
# #threshold = threshold_factor * std_diff2
#
# data['dir_change'] = (data[f'SMA{sma_period}_diff'].shift(1) * data[f'SMA{sma_period}_diff'] < 0)
# data['threshold_diff'] = data[f'SMA{sma_period}_diff2'].shift(1) - threshold_factor
# # Identifier les inversions brusques
# # data['inversion'] = (
# # (data[f'SMA{sma_period}_diff'] * data[f'SMA{sma_period}_diff'].shift(-1) < 0) & # Changement de direction
# # (data[f'SMA{sma_period}_diff2'] < threshold) # Changement brusque
# # )
# data['inversion'] = (
# (data[f'SMA{sma_period}_diff'].shift(1) < 0) & # La pente était négative
# (data[f'SMA{sma_period}_diff'] > 0) & # La pente devient positive
# # (data['percent5'] <= self.percent_threshold.value) &
# (data['close'] <= data['close_1h'].shift(12)) &
# (data[f'SMA{sma_period}_diff2'].shift(1) > threshold_factor) # Le changement est brusque
# )
#
# return data
def detect_sma_inversions(self, data, column='close', sma_period=20, threshold_factor=2, rolling_window=50):
"""
Détecte les inversions de tendance SMA vers le haut sans regarder dans le futur.
:param data: DataFrame contenant les données de prix.
:param column: Nom de la colonne des prix (ex. 'close').
:param sma_period: Période de la SMA.
:param threshold_factor: Facteur pour calculer le seuil basé sur l'écart-type.
:param rolling_window: Fenêtre pour calculer l'écart-type sur les données passées.
:return: DataFrame avec une colonne 'inversion_up' indiquant les points d'inversion.
"""
# Calcul de la SMA
data[f'SMA{sma_period}'] = data[column].rolling(window=sma_period).mean()
# Calcul de la variation de la pente (différences successives)
data[f'SMA{sma_period}_diff'] = data[f'SMA{sma_period}'].shift(1) - data[f'SMA{sma_period}']
# Calcul de la variation accélérée (différence des différences)
data[f'SMA{sma_period}_diff2'] = data[f'SMA{sma_period}_diff'].shift(1) - data[f'SMA{sma_period}_diff']
# Calcul de l'écart-type basé uniquement sur les données historiques
data['rolling_std_diff2'] = (
data[f'SMA{sma_period}_diff2'].rolling(window=rolling_window, min_periods=1).std()
)
# Définir un seuil basé sur le facteur et l'écart-type historique
data['threshold'] = threshold_factor * data['rolling_std_diff2']
# Identifier les inversions vers le haut sans regarder dans le futur
data['inversion'] = (
(data[f'SMA{sma_period}_diff'] > 0) # La pente était négative (historique)
& (data[f'SMA{sma_period}_diff'].shift(2) < data[f'SMA{sma_period}_diff'].shift(
1)) # La pente devient positive (actuel)
& (data[f'SMA{sma_period}_diff'].shift(1) >= data[f'SMA{sma_period}_diff'])
# & (data[f'SMA{sma_period}_diff2'] < data['threshold']) # Le changement est brusque
)
print(data['inversion'])
return data
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt':
return
if not self.columns_logged:
print(
f"| {'Date':<16} | {'Action':<10} | {'Pair':<10} | {'Trade Type':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'RSI':>5} | {'sma_pct_1d':>11} | {'last_max':>12} | {'rsi_pct':>12} | {'Buys':>5} | {'Stake':>10} |"
)
print(
f"|{'-' * 18}|{'-' * 12}|{'-' * 12}|{'-' * 20}|{'-' * 14}|{'-' * 8}|{'-' * 10}|{'-' * 7}|{'-' * 13}|{'-' * 14}|{'-' * 14}|{'-' * 7}|{'-' * 12}|"
)
self.columns_logged = True
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
if last_candle is not None:
if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
if last_candle['sma5_pct_1d'] is not None:
sma5_1d = round(last_candle['sma5_pct_1d'] * 100, 2)
if last_candle['sma5_pct_1h'] is not None:
sma5_1h = round(last_candle['sma5_pct_1h'] * 100, 2)
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
first_rate = self.pairs[pair]['last_max']
if action != 'Sell':
profit = round((last_candle['close'] - self.pairs[pair]['last_max']) / self.pairs[pair]['last_max'], 2)
limit_sell = rsi_pct # round((last_candle['close'] - self.pairs[pair]['last_max']) / self.pairs[pair]['last_max'], 4)
print(
f"| {date:<16} | {action:<10} | {pair:<10} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {rsi or '-':>5} | {sma5 or '-':>11} | {first_rate or '-':>12} | {limit_sell or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |"
)
@property
def protections(self):
return [
{
"method": "CooldownPeriod",
"stop_duration_candles": 12
}
# {
# "method": "MaxDrawdown",
# "lookback_period_candles": self.lookback.value,
# "trade_limit": self.trade_limit.value,
# "stop_duration_candles": self.protection_stop.value,
# "max_allowed_drawdown": self.protection_max_allowed_dd.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": self.protection_stoploss_stop.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "only_per_pair": False
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 6,
# "trade_limit": 2,
# "stop_duration_candles": 60,
# "required_profit": 0.02
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "required_profit": 0.01
# }
]
# def custom_backtest_summary(self, results: dict):
# # Récupérer les stats existantes
# total_trades = results.get("total_trades", 0)
# profit_total = results.get("profit_total", 0)
# wins = results.get("wins", 0)
# losses = results.get("losses", 0)
# days = results.get("backtest_days", 1)
#
# # Calculer de nouvelles métriques
# win_loss_ratio = wins / losses if losses > 0 else float('inf')
# avg_daily_profit = profit_total / days
# trades_per_day = total_trades / days
#
# # Ajouter les nouvelles stats aux logs
# logger.info("━━━━━━ METRIQUES PERSO ━━━━━━")
# logger.info(f"Win/Loss Ratio: {win_loss_ratio:.2f}")
# logger.info(f"Avg Daily Profit USDT: {avg_daily_profit:.2f}")
# logger.info(f"Trades Per Day: {trades_per_day:.2f}")
# logger.info("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")