Files
Freqtrade/Zeus_8_1d.py
Jérôme Delacotte 67f617a5da Zeus_8_1d_Bilan.txt
2025-10-08 11:00:53 +02:00

1540 lines
69 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
from pandas import DataFrame
from typing import Optional, Union, Tuple
from typing import List
import logging
import configparser
from technical import pivots_points
# --------------------------------
# Add your lib to import here test git
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
import requests
from datetime import timezone, timedelta
from scipy.signal import savgol_filter
from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator
from collections import Counter
logger = logging.getLogger(__name__)
from tabulate import tabulate
# Couleurs ANSI de base
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
RESET = "\033[0m"
def pprint_df(dframe):
print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False))
def normalize(df):
df = (df - df.min()) / (df.max() - df.min())
return df
class Zeus_8_1d(IStrategy):
levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
startup_candle_count = 12 * 24 * 2
# ROI table:
minimal_roi = {
"0": 10
}
stakes = 40
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = True
# Buy hypers
timeframe = '5m'
max_open_trades = 5
max_amount = 40
# DCA config
position_adjustment_enable = True
plot_config = {
"main_plot": {
"mid_smooth_5": {
"color": "blue"
},
"mid_smooth_12": {
"color": "green"
},
"mid_smooth_24": {
"color": "yellow"
},
"bb_lowerband": {
"color": "#da59a6"},
"bb_upperband": {
"color": "#da59a6",
}
},
"subplots": {
"Rsi": {
"max_rsi_12": {
"color": "blue"
},
},
"Deriv1": {
"mid_smooth_5_deriv1": {
"color": "blue"
},
"mid_smooth_12_deriv1": {
"color": "green"
},
"mid_smooth_24_deriv1": {
"color": "yellow"
},
},
"Deriv2": {
"mid_smooth_5_deriv2": {
"color": "blue"
},
"mid_smooth_12_deriv2": {
"color": "green"
},
"mid_smooth_24_deriv2": {
"color": "yellow"
},
},
"Down": {
"percage_upperband": {
"color": "green"
},
"percage_up": {
"color": "blue"
}
}
# "Diff": {
# "sma10_deriv1": {
# "color": "#74effc"
# }
# },
}
}
columns_logged = False
pairs = {
pair: {
"first_buy": 0,
"last_buy": 0.0,
"last_min": 999999999999999.5,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
"last_candle": {},
"last_trade": None,
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'last_palier_index': -1,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
'force_buy': False
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
# 20 20 40 60 100 160 260 420
# 50 50 100 300 500
# fibo = [1, 1, 2, 3, 5, 8, 13, 21]
# my fibo
# 50 50 50 100 100 150 200 250 350 450 600 1050
fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21]
baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84]
# Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21
# Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050
# Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200
# baisse 1 2 3 5 7 10 14 19 26 35 47 63 84
# factors = [1, 1.1, 1.25, 1.5, 2.0, 3]
# thresholds = [2, 5, 10, 20, 30, 50]
factors = [0.5, 0.75, 1, 1.25, 1.5, 2]
thresholds = [0, 2, 5, 10, 30, 45]
trades = list()
max_profit_pairs = {}
# =========================================================================
# Parameters hyperopt
mise_factor_buy = DecimalParameter(0.01, 0.2, default=0.05, decimals=2, space='buy', optimize=True, load=True)
# Récupération des labels ordonnés
# labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5']
# index_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5']
# ordered_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5']
labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3']
index_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3']
ordered_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3']
label_to_index = {label: i for i, label in enumerate(ordered_labels)}
# =========================================================================
# paliers dérivées jour sma5
sma5_deriv1 = [-1.1726, -0.2131, -0.1012, -0.0330, 0.0169, 0.0815, 0.2000, 4.0335]
sma5_deriv2 = [-1.9190, -0.1388, -0.0644, -0.0202, 0.0209, 0.0646, 0.1377, 4.2987]
sma5_derive1_2_matrice = {
'B3': [8.6, 10.8, 34.6, 35.0, 58.8, 61.9, 91.2],
'B2': [0.0, 12.5, 9.1, 57.1, 63.3, 79.3, 89.5],
'B1': [6.1, 12.5, 22.0, 46.8, 61.5, 70.0, 100.0],
'N0': [0.0, 10.7, 37.0, 43.5, 75.0, 75.9, 100.0],
'H1': [0.0, 18.5, 32.4, 35.9, 76.8, 82.9, 92.0],
'H2': [0.0, 21.9, 16.0, 39.5, 69.7, 83.3, 100.0],
'H3': [9.5, 29.2, 41.2, 57.9, 53.8, 86.8, 92.3],
}
sma5_derive1_2_matrice_df = pd.DataFrame(sma5_derive1_2_matrice, index=index_labels)
# Extraction de la matrice numérique
sma5_derive1_2_numeric_matrice = sma5_derive1_2_matrice_df.reindex(index=ordered_labels,
columns=ordered_labels).values
# paliers = {}
indicateur_achat_vente = 'mid_smooth_12'
should_enter_trade_count = 0
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
# val = self.getProbaHausse144(last_candle)
# allow_to_buy = True #(not self.stop_all) #& (not self.all_down)
allow_to_buy = not self.pairs[pair]['stop'] # and val > self.buy_val.value #not last_candle['tendency'] in ('B-', 'B--') # (rate <= float(limit)) | (entry_tag == 'force_entry')
force = self.pairs[pair]['force_buy']
if self.pairs[pair]['force_buy']:
self.pairs[pair]['force_buy'] = False
allow_to_buy = True
else:
if not self.should_enter_trade(pair, last_candle, current_time):
allow_to_buy = False
if allow_to_buy:
self.trades = list()
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_palier_index'] = -1
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
dispo = round(self.wallets.get_available_stake_amount())
self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.pairs[pair]['total_amount'] = stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
force = self.pairs[pair]['force_sell']
allow_to_sell = (last_candle['percent'] < 0) #or force
minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0))
if allow_to_sell:
self.trades = list()
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['last_candle'] = last_candle
self.trades = list()
dispo = round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟥Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2)
)
self.pairs[pair]['max_profit'] = 0
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['has_gain'] = 0
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['total_amount'] = 0
self.pairs[pair]['count_of_buys'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['last_palier_index'] = -1
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['current_trade'] = None
return (allow_to_sell) | (exit_reason == 'force_exit')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
current_candle = dataframe.iloc[-1].squeeze()
adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle)
# print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}")
# Use default stake amount.
return adjusted_stake_amount
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_1h = dataframe.iloc[-13].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
before_last_candle_2 = dataframe.iloc[-3].squeeze()
before_last_candle_12 = dataframe.iloc[-13].squeeze()
before_last_candle_24 = dataframe.iloc[-25].squeeze()
expected_profit = self.expectedProfit(pair, last_candle)
# print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
max_touch_before = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
self.pairs[pair]['current_trade'] = trade
count_of_buys = trade.nr_of_successful_entries
profit = round(current_profit * trade.stake_amount, 1)
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
max_profit = self.pairs[pair]['max_profit']
baisse = 0
if profit > 0:
baisse = 100 * abs(max_profit - profit) / max_profit
mx = max_profit / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
if hours % 4 == 0:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🔴 CURRENT" if self.pairs[pair]['stop'] else "🟢 CURRENT",
dispo=dispo,
pair=pair,
rate=last_candle['close'],
trade_type='',
profit=profit,
buys='',
stake=0
)
pair_name = self.getShortName(pair)
# if baisse > 20 and before_last_candle[self.indicateur_achat_vente] > last_candle[self.indicateur_achat_vente] :
# self.pairs[pair]['force_sell'] = False
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 5)
# return 'Baisse_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) + '_' + str(round(baisse, 2))
if last_candle['mid_smooth_5_deriv1'] <= -0.1 and profit > expected_profit and last_candle['rsi'] > 65:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 5)
return 'RSI_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) + '_' + str(round(baisse, 2))
if last_candle['mid_smooth_24_deriv1'] <= -0.1 \
and profit > expected_profit:
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 5)
return 'Drv3_' + pair_name + '_' + str(count_of_buys) + '_' + str(self.pairs[pair]['has_gain']) + '_' + str(round(baisse, 2))
self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
def getShortName(self, pair):
return pair.replace("/USDT", '').replace("/USDC", '')
def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float:
if pct <= thresholds[0]:
return factors[0]
if pct >= thresholds[-1]:
return factors[-1]
for i in range(1, len(thresholds)):
if pct <= thresholds[i]:
# interpolation linéaire entre thresholds[i-1] et thresholds[i]
return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / (
thresholds[i] - thresholds[i - 1])
# Juste au cas où (devrait jamais arriver)
return factors[-1]
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return
if self.columns_logged % 10 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |"
f"Tdc|{'val':>6}| RSI |s201d|s5_1d|s5_2d|s51h|s52h"
)
self.printLineLog()
df = pd.DataFrame.from_dict(self.pairs, orient='index')
colonnes_a_exclure = ['last_candle', 'last_trade', 'last_palier_index', 'current_trade',
'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
print(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = self.getLastLost(last_candle, pair)
if buys is None:
buys = ''
max_touch = '' # round(last_candle['max12'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = self.getPctFirstBuy(pair, last_candle)
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
dist_max = self.getDistMax(last_candle, pair)
val = self.getProbaHausseSma5d(last_candle)
pct60 = round(100 * self.getPct60D(pair, last_candle), 2)
color = GREEN if profit > 0 else RED
color_sma20 = GREEN if last_candle['sma20_deriv1'] > 0 else RED
color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1'] > 0 else RED
color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2'] > 0 else RED
color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED
color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(self.pairs[pair]['last_max'],3)
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(self.pairs[pair]['last_min'], 3)
profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
# tdc last_candle['tendency_12']
self.printLog(
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
f"|{ last_candle['tendency_12'] or '-':>3}|"
f"{round(val, 1) or '-' :>6}|"
f"{round(last_candle['rsi'], 0):>7}|{color_sma20}{round(last_candle['sma20_deriv1'], 2):>5}{RESET}"
f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2'], 2):>5}{RESET}"
f"|{color_sma5_1h}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 2):>5}{RESET}"
# f"|{last_candle['min60']}|{last_candle['max60']}"
)
def getLastLost(self, last_candle, pair):
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
return last_lost
def getDistMax(self, last_candle, pair):
mx = last_candle['max12']
dist_max = round(100 * (mx - last_candle['close']) / mx, 0)
return dist_max
def printLineLog(self):
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
f"{'-' * 3}"
# "+{'-' * 3}+{'-' * 3}
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
)
def printLog(self, str):
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return;
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def add_tendency_column(self, dataframe: pd.DataFrame, name, suffixe='') -> pd.DataFrame:
def tag_by_derivatives(row):
d1 = row[f"{name}{suffixe}_deriv1"]
d2 = row[f"{name}{suffixe}_deriv2"]
d1_lim_inf = -0.01
d1_lim_sup = 0.01
if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup:
return 'P' # Palier
if d1 == 0.0:
return 'DH' if d2 > 0 else 'DB' # Depart Hausse / Départ Baisse
if d1 > d1_lim_sup:
return 'H++' if d2 > 0 else 'H+' # Acceleration Hausse / Ralentissement Hausse
if d1 < d1_lim_inf:
return 'B--' if d2 < 0 else 'B-' # Accéleration Baisse / Ralentissement Baisse
return 'Mid'
dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1)
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['hapercent3'] = (dataframe['haclose'] - dataframe['haopen'].shift(3)) / dataframe['haclose'].shift(3)
dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5)
self.calculeDerivees(dataframe, 'sma5', horizon=10)
dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10)
self.calculeDerivees(dataframe, 'sma10', horizon=10)
dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20)
self.calculeDerivees(dataframe, 'sma20', horizon=20)
dataframe['sma60'] = talib.SMA(dataframe, timeperiod=60)
self.calculeDerivees(dataframe, 'sma60', horizon=60)
dataframe['sma144'] = talib.SMA(dataframe, timeperiod=144)
self.calculeDerivees(dataframe, 'sma144')
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3)
dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5)
dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12)
dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3")
# dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_6")
dataframe["mid_re_smooth_3"] = self.conditional_smoothing(dataframe['mid_smooth_3'].dropna(),
threshold=0.0005).dropna()
self.calculeDerivees(dataframe, "mid_re_smooth_3", horizon=3)
dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12")
dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", factor_1=1000, factor_2=10)
# print(metadata['pair'])
dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
self.calculeDerivees(dataframe, 'rsi', horizon=12)
dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12)
dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60)
dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = ((dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_upperband"])
# Normalization
dataframe['bb_upperband5'] = dataframe['bb_upperband'].rolling(window=5).mean()
dataframe['bb_lowerband5'] = dataframe['bb_lowerband'].rolling(window=5).mean()
dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5")
dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12")
dataframe['percage_up'] = dataframe['high'] > dataframe['bb_upperband']
dataframe['percage_upperband'] = dataframe['percage_up'].astype(int) * (
dataframe['percage_up'].groupby((dataframe['percage_up'] != dataframe['percage_up'].shift()).cumsum()).cumcount() + 1)
# informative['futur_percent_3d'] = 100 * (informative['close'].shift(-3) - informative['close']) / informative['close']
#
# self.calculateProbabilite2Index(informative, ['futur_percent'], 'rsi_deriv1', 'rsi')
# # self.calculateProbabilite2Index(dataframe, ['futur_percent_3d'], 'rsi_deriv1', 'sma5')
# informative['close_smooth'] = self.conditional_smoothing(informative['mid'].dropna(), threshold=0.0015).dropna()
# informative['smooth'], informative['deriv1'], informative['deriv2'] = self.smooth_and_derivatives(informative['close_smooth'])
# informative['deriv1'] = 100 * informative['deriv1'] / informative['mid']
# informative['deriv2'] = 1000 * informative['deriv2'] / informative['mid']
# poly_func, x_future, y_future, count = self.polynomial_forecast(informative['sma5_deriv1'], window=24, degree=4)
# informative['futur_percent_3'] = 100 * ((informative['sma5'].shift(-1) - informative['sma5']) / informative['sma5'])
# futur_cols = ['futur_percent_3']
# indic_1 = 'sma5_deriv1'
# indic_2 = 'sma5_deriv2'
# self.calculateProbabilite2Index(informative, futur_cols, indic_1, indic_2)
# self.calculePlateaux(informative, 14, 0.01)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
# dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01
# dataframe['limit'] = dataframe['close']
count_buys = 0
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
filled_buys = trade.select_filled_orders('buy')
count = 0
amount = 0
for buy in filled_buys:
if count == 0:
dataframe['first_price'] = buy.price
self.pairs[pair]['first_buy'] = buy.price
# dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
self.pairs[pair]['last_buy'] = buy.price
count = count + 1
amount += buy.price * buy.filled
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
count_buys = count
# dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100)
# dataframe['amount'] = amount
# Compter les baisses / hausses consécutives
self.calculateDownAndUp(dataframe, limit=0.0001)
horizon_h = 12
dataframe['close_smooth'] = self.conditional_smoothing(dataframe['mid'].rolling(3).mean().dropna(),
threshold=0.001)
dataframe['smooth'], dataframe['deriv1'], dataframe['deriv2'] = self.smooth_and_derivatives(
dataframe['close_smooth'])
dataframe['deriv1'] = 100 * dataframe['deriv1'] / dataframe['mid']
dataframe['deriv2'] = 100 * dataframe['deriv2'] / dataframe['mid']
# ===============================
# Lissage des valeurs Journalières
horizon_d = 12 * 5 * 24
dataframe['ema_volume'] = 20 * (dataframe['volume'] * dataframe['percent']) / (
abs(dataframe['volume'].shift(1)) + abs(dataframe['volume'].shift(2)))
self.calculeDerivees(dataframe, 'ema_volume', factor_1=10, factor_2=1, horizon=14)
return dataframe
def calculeDerivees(self, dataframe, indic, factor_1=100, factor_2=10, horizon=5):
dataframe[f"{indic}_deriv1"] = (factor_1 * dataframe[f"{indic}"].diff() / dataframe[f"{indic}"]).rolling(horizon).mean()
dataframe[f"{indic}_deriv2"] = (factor_2 * dataframe[f"{indic}_deriv1"].diff()).rolling(horizon).mean()
def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['mid_smooth_12_deriv1'] < limit # dataframe['hapercent'] <= limit
dataframe['up'] = dataframe['mid_smooth_12_deriv1'] > limit # dataframe['hapercent'] >= limit
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
def calculateDerivation(self, dataframe, window=12, suffixe='', factor_1=100, factor_2=10):
dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
# 1. Calcul du lissage par moyenne mobile médiane
dataframe[f"mid_smooth{suffixe}"] = dataframe['haclose'].rolling(window=window).mean()
# 2. Dérivée première = différence entre deux bougies successives
dataframe[f"mid_smooth{suffixe}_deriv1"] = round(
factor_1 * dataframe[f"mid_smooth{suffixe}"].rolling(window=3).mean().diff() / dataframe[
f"mid_smooth{suffixe}"], 4)
# 3. Dérivée seconde = différence de la dérivée première
dataframe[f"mid_smooth{suffixe}_deriv2"] = round(
factor_2 * dataframe[f"mid_smooth{suffixe}_deriv1"].rolling(window=3).mean().diff(), 4)
dataframe = self.add_tendency_column(dataframe, "mid_smooth", suffixe)
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
self.trades = Trade.get_open_trades()
return self.trades
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
# self.getOpenTrades()
# expected_profit = self.expectedProfit(pair, dataframe.iloc[-1])
# self.getBinanceOrderBook(pair, dataframe)
# Calcul de la pente (différence entre deux bougies consécutives)
dataframe['bb_ub_slope'] = dataframe['bb_upperband'].diff()
# Détection d'une inversion (changement de signe de la pente)
# inversion = (
# (dataframe['bb_ub_slope'].shift(1).rolling(5).apply(
# lambda x: any(x[i] > 0 and x[i + 1] < 0 for i in range(len(x) - 1)))) == 1
# )
# On regarde si la bande supérieure a atteint un maximum il y a k bougies
# lookback = 5
# inversion = (dataframe['bb_upperband'] == dataframe['bb_upperband'].rolling(lookback).max())
# pente de la bb_upperband
dataframe['bb_ub_slope'] = dataframe['bb_upperband5'].diff()
# évènement "inversion vers le bas" (pente passe de >0 à <=0) sur chaque bougie
cross_down = (dataframe['bb_ub_slope'].shift(1) > 0) & (dataframe['bb_ub_slope'] <= 0)
dataframe['bb_cross_down'] = 10000 * cross_down * dataframe['bb_width'] \
* (dataframe['bb_lowerband'] - dataframe['bb_lowerband'].shift(1)) / dataframe[
'bb_lowerband']
# vrai si AU MOINS une inversion a eu lieu dans les 5 bougies *précédentes* (on exclut l'actuelle)
inversion_last5 = cross_down.shift(1).rolling(5, min_periods=1).max().astype(bool)
dataframe['inversion_last5'] = inversion_last5
N = 24 # nombre minimum de bougies avant inversion
rise_threshold = 1.0 # % de hausse à ne pas dépasser
# # Calcul de la hausse minimale avant inversion
# def compute_rise(idx):
# if idx < N:
# return 0
# low_before = dataframe['close'].iloc[idx - N:idx].min() # min des N bougies avant inversion
# return (dataframe['close'].iloc[idx] / low_before - 1) * 100
#
# rise = [compute_rise(i) for i in range(len(dataframe))]
# dataframe['rise_before_inversion'] = rise
#
# # Filtre : inversion sans forte hausse avant
# valid_inversion = inversion_last5 & (dataframe['rise_before_inversion'] <= rise_threshold)
# dataframe.loc[
# (
# (dataframe['percent'] > 0)
# & (dataframe['mid_smooth_12_deriv1'] >= dataframe['mid_smooth_12_deriv1'].shift(1))
# ), ['enter_long', 'enter_tag']] = (1, 'down')
factor = 1.01
if pair == "BTC/USDT" or pair == "BTC/USDC":
factor = factor / 2
dataframe.loc[
(
# (valid_inversion & inversion_last5 )
# (dataframe['mid_smooth_12'].shift(2) > dataframe['mid_smooth_12'].shift(1))
# (dataframe['mid_smooth_24_deriv1'].shift(1) <= 0)
(dataframe['mid_smooth_24_deriv1'] >= 0.05)
& (dataframe['mid_smooth_24_deriv2'] > 0)
# & (dataframe['hapercent'] > 0)
#& (dataframe['max_rsi_12'] < 50)
# & (dataframe['open'] <= dataframe['bb_middleband'])
), ['enter_long', 'enter_tag']] = (1, 'smth_12')
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
return dataframe
def calculateProbabilite2Index(self, df, futur_cols, indic_1, indic_2):
# # Définition des tranches pour les dérivées
# bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf]
# labels = ['forte baisse', 'légère baisse', 'neutre', 'légère hausse', 'forte hausse']
#
# # Ajout des colonnes bin (catégorisation)
# df[f"{indic_1}_bin"] = pd.cut(df['mid_smooth_1h_deriv1'], bins=bins_deriv, labels=labels)
# df[f"{indic_2}_bin"] = pd.cut(df['mid_smooth_12_deriv1'], bins=bins_deriv, labels=labels)
#
# # Colonnes de prix futur à analyser
# futur_cols = ['futur_percent_1h', 'futur_percent_2h', 'futur_percent_3h', 'futur_percent_4h', 'futur_percent_5h']
#
# # Calcul des moyennes et des effectifs
# grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"])[futur_cols].agg(['mean', 'count'])
#
# pd.set_option('display.width', 200) # largeur max affichage
# pd.set_option('display.max_columns', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 300) # largeur max affichage
# nettoyage
# series = df[f"{indic_2}"].dropna()
# unique_vals = df[f"{indic_2}"].nunique()
# print(unique_vals)
# print(df[f"{indic_2}"])
n = len(self.labels)
df[f"{indic_1}_bin"], bins_1h = pd.qcut(df[f"{indic_1}"], q=n, labels=self.labels, retbins=True,
duplicates='drop')
df[f"{indic_2}_bin"], bins_1d = pd.qcut(df[f"{indic_2}"], q=n, labels=self.labels, retbins=True,
duplicates='drop')
# Affichage formaté pour code Python
print(f"Bornes des quantiles pour {indic_1} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]")
print(f"Bornes des quantiles pour {indic_2} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]")
# Agrégation
grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[futur_cols].agg(['mean', 'count'])
# Affichage
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(grouped.round(4))
# Ajout des probabilités de hausse
for col in futur_cols:
df[f"{col}_is_up"] = df[col] > 0
# Calcul de la proba de hausse
proba_up = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[f"{col}_is_up"].mean().unstack()
print(f"\nProbabilité de hausse pour {col} (en %):")
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print((proba_up * 100).round(1))
# Affichage formaté des valeurs comme tableau Python
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
df_formatted = (proba_up * 100).round(1)
print("data = {")
for index, row in df_formatted.iterrows():
row_values = ", ".join([f"{val:.1f}" for val in row])
print(f"'{index}': [{row_values}], ")
print("}")
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# (dataframe['mid_smooth_12_deriv1'] == 0)
# & (dataframe['mid_smooth_12_deriv1'].shift(1) > 0)
# ), ['sell', 'exit_long']] = (1, 'sell_sma5_pct_1h')
return dataframe
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# print("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 0): # or trade.stake_amount >= max_stake:
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle_12 = dataframe.iloc[-13].squeeze()
before_last_candle_24 = dataframe.iloc[-25].squeeze()
last_candle_3 = dataframe.iloc[-4].squeeze()
last_candle_previous_1h = dataframe.iloc[-13].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pair = trade.pair
pct_first = 0
total_counts = sum(
pair_data['count_of_buys'] for pair_data in self.pairs.values() if not pair in ('BTC/USDT', 'BTC/USDC'))
if self.pairs[pair]['first_buy']:
pct_first = self.getPctFirstBuy(pair, last_candle)
pct = 0.012
if count_of_buys == 1:
pct_max = current_profit
else:
if self.pairs[trade.pair]['last_buy']:
pct_max = self.getPctLastBuy(pair, last_candle)
else:
pct_max = - pct
if pair in ('BTC/USDT', 'BTC/USDC') or count_of_buys <= 2:
lim = - pct - (count_of_buys * 0.001)
# lim = self.getLimitBuy(pair, last_candle, pct)
# lim = - (0.012 * (1 + round(count_of_buys / 5)) + 0.001 * (count_of_buys - 1))
# lim = - (0.012 + 0.001 * (count_of_buys - 1) + (0.002 * count_of_buys if count_of_buys > 10 else 0.001 * count_of_buys if count_of_buys > 5 else 0))
else:
pct = 0.05
lim = - pct - (count_of_buys * 0.0025)
# lim = self.getLimitBuy(pair, last_candle, pct)
if (len(dataframe) < 1):
# print("skip dataframe")
return None
if not self.should_enter_trade(pair, last_candle, current_time):
return None
condition = (last_candle['sma5_deriv1'] > 0) # and \
if condition and (pct_max < lim):
try:
if self.pairs[pair]['has_gain']:
self.pairs[pair]['force_sell'] = True
last_lost = self.getLastLost(last_candle, pair)
max_amount = self.config.get('stake_amount') * 2.5
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / self.mise_factor_buy.value) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys])
if self.wallets.get_available_stake_amount() > stake_amount:
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟧 Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(current_profit * trade.stake_amount, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
# colonnes_a_exclure = ['last_candle', 'last_trade', 'last_palier_index', 'stop',
# 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# print(df_filtered)
return stake_amount
return None
except Exception as exception:
# print(exception)
return None
last_lost = self.getLastLost(last_candle, pair)
if (False and hours > 6 and last_candle['mid_smooth_5_deriv1'] > 0):
try:
stake_amount = self.pairs[pair]['first_amount'] / 4
if self.wallets.get_available_stake_amount() > stake_amount:
self.pairs[pair]['has_gain'] += 1
trade_type = 'Gain +'
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟡 Gain +",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=str(round(pct_max, 4)),
profit=round(current_profit * trade.stake_amount, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
except Exception as exception:
# print(exception)
return None
return None
def getPctFirstBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
def getPctLastBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
def getPct60D(self, pair, last_candle):
return round((last_candle['max60'] - last_candle['min60']) / last_candle['max60'], 4)
def getPctClose60D(self, pair, last_candle):
if last_candle['close'] > last_candle['max12']:
return 1
if last_candle['close'] < last_candle['min12']:
return 0
return round(
(last_candle['close'] - last_candle['min12']) / (last_candle['max12'] - last_candle['min12']), 4)
def getLimitBuy(self, pair, last_candle, first_pct):
count_of_buys = self.pairs[pair]['count_of_buys']
pct60 = self.getPct60D(pair, last_candle) # exemple 0.3 pour 30%
if (pct60 < 0.05):
lim = - first_pct - (count_of_buys * 0.001 * 0.05 / 0.05)
else:
# 0.1
# 0.4
lim = - first_pct - (count_of_buys * 0.001 * pct60 / 0.05)
return lim
def getProbaHausseSma5d(self, last_candle):
value_1 = self.getValuesFromTable(self.sma5_deriv1, last_candle['sma5_deriv1'])
value_2 = self.getValuesFromTable(self.sma5_deriv2, last_candle['sma5_deriv2'])
val = self.approx_val_from_bins(
matrice=self.sma5_derive1_2_matrice_df,
numeric_matrice=self.sma5_derive1_2_numeric_matrice,
row_label=value_2,
col_label=value_1
)
return val
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# Calculer le minimum des 14 derniers jours
base_stake_amount = self.config.get('stake_amount') # Montant de base configuré
# pct60 = round(100 * self.getPctClose60D(pair, last_candle), 2)
if True: # not pair in ('BTC/USDT', 'BTC/USDC'):
# factors = [1, 1.2, 1.3, 1.4]
if self.pairs[pair]['count_of_buys'] == 0:
# pctClose60 = self.getPctClose60D(pair, last_candle)
dist_max = self.getDistMax(last_candle, pair)
factor = self.multi_step_interpolate(dist_max, self.thresholds, self.factors)
adjusted_stake_amount = max(base_stake_amount / 5, base_stake_amount * factor)
else:
adjusted_stake_amount = self.pairs[pair]['first_amount']
else:
first_price = self.pairs[pair]['first_buy']
if (first_price == 0):
first_price = last_candle['close']
last_max = last_candle['max12']
pct = 5
if last_max > 0:
pct = 100 * (last_max - first_price) / last_max
factor = self.multi_step_interpolate(pct, self.thresholds, self.factors)
adjusted_stake_amount = base_stake_amount * factor # max(base_stake_amount, min(100, base_stake_amount * percent_4))
# pct = 100 * abs(self.getPctFirstBuy(pair, last_candle))
#
# factor = self.multi_step_interpolate(pct, self.thresholds, self.factors)
if self.pairs[pair]['count_of_buys'] == 0:
self.pairs[pair]['first_amount'] = adjusted_stake_amount
return adjusted_stake_amount
def expectedProfit(self, pair: str, last_candle: DataFrame):
lim = 0.01
pct = 0.002
if pair == "BTC/USDT" or pair == "BTC/USDC":
lim = 0.005
pct = 0.001
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
# if self.pairs[pair]['count_of_buys'] > 6:
# pct_to_max = 0.006 * self.pairs[pair]['count_of_buys']
# pctClose60 = self.getPctClose60D(pair, last_candle)
# max_60 = last_candle['max60']
# if last_candle['close'] < max_60:
# pct_to_max = 0.25 * (max_60 - last_candle['close']) / max_60
# pct_to_max = pct_to_max * (2 - pctClose60)
expected_profit = lim #* self.pairs[pair]['total_amount'] #min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
# print(
# f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}")
return expected_profit
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values
# ✅ Première dérivée(variation ou pente)
# Positive: la courbe est croissante → tendance haussière.
# Négative: la courbe est décroissante → tendance baissière.
# Proche de 0: la courbe est plate → marché stable ou en transition.
#
# Applications:
# Détecter les points dinflexion(changement de tendance) quand elle sannule.\
# Analyser la vitesse dun mouvement(plus elle est forte, plus le mouvement est impulsif).
#
# ✅ Seconde dérivée(accélération ou concavité)
# Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse.
# Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse.
# Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements.
#
# Exemples:
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
#
# Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0.
# Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe.
def calculateRegression(self,
dataframe: DataFrame,
column='close',
window=50,
degree=3,
future_offset: int = 10 # projection à n bougies après
) -> DataFrame:
df = dataframe.copy()
regression_fit = []
regression_future_fit = []
regression_fit = []
regression_future_fit = []
for i in range(len(df)):
if i < window:
regression_fit.append(np.nan)
regression_future_fit.append(np.nan)
continue
# Fin de la fenêtre dapprentissage
end_index = i
start_index = i - window
y = df[column].iloc[start_index:end_index].values
# Si les données sont insuffisantes (juste par précaution)
if len(y) < window:
regression_fit.append(np.nan)
regression_future_fit.append(np.nan)
continue
# x centré pour meilleure stabilité numérique
x = np.linspace(-1, 1, window)
coeffs = np.polyfit(x, y, degree)
poly = np.poly1d(coeffs)
# Calcul point présent (dernier de la fenêtre)
x_now = x[-1]
regression_fit.append(poly(x_now))
# Calcul point futur, en ajustant si on dépasse la fin
remaining = len(df) - i - 1
effective_offset = min(future_offset, remaining)
x_future = x_now + (effective_offset / window) * 2 # respect du même pas
regression_future_fit.append(poly(x_future))
df[f"{column}_regression"] = regression_fit
# 2. Dérivée première = différence entre deux bougies successives
df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"],
4)
# 3. Dérivée seconde = différence de la dérivée première
df[f"{column}_regression_deriv2"] = round(
10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4)
df[f"{column}_future_{future_offset}"] = regression_future_fit
# # 2. Dérivée première = différence entre deux bougies successives
# df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4)
#
# # 3. Dérivée seconde = différence de la dérivée première
# df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4)
return df
def getValuesFromTable(self, values, value):
for i in range(len(values) - 1):
if values[i] <= value < values[i + 1]:
return self.labels[i]
return self.labels[-1] # cas limite pour la borne max
def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label):
"""
Renvoie une approximation de la valeur à partir des labels binaires (e.g. B5, H1)
en utilisant une interpolation simple basée sur les indices.
Parameters:
matrix_df (pd.DataFrame): Matrice avec les labels binaires en index et colonnes.
row_label (str): Label de la ligne (ex: 'B3').
col_label (str): Label de la colonne (ex: 'H2').
Returns:
float: Valeur approchée si possible, sinon NaN.
"""
# Vérification des labels
if row_label not in matrice.index or col_label not in matrice.columns:
return np.nan
# Index correspondant
row_idx = self.label_to_index.get(row_label)
col_idx = self.label_to_index.get(col_label)
# Approximation directe (aucune interpolation complexe ici, juste une lecture)
return numeric_matrice[row_idx, col_idx]
@property
def protections(self):
return [
# {
# "method": "CooldownPeriod",
# "stop_duration_candles": 12
# }
# {
# "method": "MaxDrawdown",
# "lookback_period_candles": self.lookback.value,
# "trade_limit": self.trade_limit.value,
# "stop_duration_candles": self.protection_stop.value,
# "max_allowed_drawdown": self.protection_max_allowed_dd.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": self.protection_stoploss_stop.value,
# "only_per_pair": False
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "only_per_pair": False
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 6,
# "trade_limit": 2,
# "stop_duration_candles": 60,
# "required_profit": 0.02
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "required_profit": 0.01
# }
]
def conditional_smoothing(self, series, threshold=0.002):
smoothed = [series.iloc[0]]
for val in series.iloc[1:]:
last = smoothed[-1]
if abs(val - last) / last >= threshold:
smoothed.append(val)
else:
smoothed.append(last)
return pd.Series(smoothed, index=series.index)
def smooth_and_derivatives(self, series, window=25, polyorder=3):
series = series.copy()
if series.isna().sum() > 0:
series = series.ffill().bfill() # Si tu veux éviter toute NaN
smooth = self.causal_savgol(series, window=window, polyorder=polyorder)
deriv1 = np.diff(smooth, prepend=smooth[0])
deriv2 = np.diff(deriv1, prepend=deriv1[0])
return pd.Series(smooth, index=series.index), pd.Series(deriv1, index=series.index), pd.Series(deriv2,
index=series.index)
def causal_savgol(self, series, window=25, polyorder=3):
result = []
half_window = window # Fenêtre complète dans le passé
for i in range(len(series)):
if i < half_window:
result.append(np.nan)
continue
window_series = series[i - half_window:i]
if window_series.isna().any():
result.append(np.nan)
continue
coeffs = np.polyfit(range(window), window_series, polyorder)
poly = np.poly1d(coeffs)
result.append(poly(window - 1))
return pd.Series(result, index=series.index)
def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]):
"""
Calcule une régression polynomiale sur les `window` dernières valeurs de la série,
puis prédit les `n_future` prochaines valeurs.
:param series: Série pandas (ex: dataframe['close'])
:param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme
:param degree: Degré du polynôme (ex: 2 pour quadratique)
:param n_future: Nombre de valeurs futures à prédire
:return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures
"""
if len(series) < window:
raise ValueError("La série est trop courte pour la fenêtre spécifiée.")
recent_y = series.iloc[-window:].values
x = np.arange(window)
coeffs = np.polyfit(x, recent_y, degree)
poly = np.poly1d(coeffs)
x_future = np.arange(window, window + len(steps))
y_future = poly(x_future)
# Affichage de la fonction
# print("Fonction polynomiale trouvée :")
# print(poly)
current = series.iloc[-1]
count = 0
for future_step in steps: # range(1, n_future + 1)
future_x = window - 1 + future_step
prediction = poly(future_x)
# series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction
# Afficher les prédictions
# print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}")
if prediction > 0: # current:
count += 1
return poly, x_future, y_future, count
def calculateStats(self, df, index, target):
# Nombre de tranches (modifiable)
n_bins_indice = 11
n_bins_valeur = 11
# Créer les tranches dynamiques
df['indice_tranche'] = pd.qcut(df[index], q=n_bins_indice, duplicates='drop')
df['valeur_tranche'] = pd.qcut(df[target], q=n_bins_valeur, duplicates='drop')
# Créer un tableau croisé avec la moyenne des valeurs
pivot_mean = df.pivot_table(
index='indice_tranche',
columns='valeur_tranche',
values=target, # <-- c'est la colonne qu'on agrège
aggfunc='mean' # <-- on calcule la moyenne
)
# Résultat
# print("Moyenne des valeurs par double-tranche :")
# print(pivot_mean.round(2))
def should_enter_trade(self, pair: str, last_candle, current_time) -> bool:
return True
limit = 3
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
# if not pair.startswith('BTC'):
dispo = round(self.wallets.get_available_stake_amount())
if self.pairs[pair]['stop'] and last_candle['mid_smooth_5_deriv1'] > -0.9 and last_candle['sma5_deriv1'] > 0 and last_candle['sma5_deriv2'] > 0:
self.pairs[pair]['stop'] = False
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟢RESTART",
dispo=dispo,
pair=pair,
rate=last_candle['close'],
trade_type='',
profit=0,
buys=self.pairs[pair]['count_of_buys'],
stake=0
)
else:
if self.pairs[pair]['stop'] == False and (last_candle['sma5_deriv1'] < -0.2 or last_candle['sma5_deriv2'] < -3):
self.pairs[pair]['stop'] = True
# if self.pairs[pair]['current_profit'] > 0:
# self.pairs[pair]['force_sell'] = True
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🔴STOP",
dispo=dispo,
pair=pair,
rate=last_candle['close'],
trade_type='',
profit=self.pairs[pair]['current_profit'],
buys=self.pairs[pair]['count_of_buys'],
stake=0
)
return False
if self.pairs[pair]['stop']:
return False
# if pair.startswith('BTC'):
# return True # BTC toujours autorisé
#return True
# Filtrer les paires non-BTC
non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')]
# Compter les positions actives sur les paires non-BTC
max_nb_trades = 0
total_non_btc = 0
max_pair = ''
limit_amount = 250
max_amount = 0
for p in non_btc_pairs:
max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys'])
max_amount = max(max_amount, self.pairs[p]['total_amount'])
for p in non_btc_pairs:
if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit):
# if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount):
max_pair = p
total_non_btc += self.pairs[p]['count_of_buys']
pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle)
val = self.getProbaHausseSma5d(last_candle)
if val < 15:
return False
return True
self.should_enter_trade_count = 0
# if max_pair != pair and self.pairs[pair]['total_amount'] > 300:
# return False
if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit):
trade = self.pairs[max_pair]['current_trade']
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
current_time_utc = current_time.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pct_max_max = self.getPctFirstBuy(max_pair, last_candle)
# print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}")
return max_pair == pair or pct_max < - 0.25 or (
pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30)
else:
return True
def calculePlateaux(self, informative: pd.DataFrame, plateau_duration, plateau_tolerance) -> pd.DataFrame:
# 1. Détection plateau
informative['rolling_min'] = informative['close'].rolling(plateau_duration).min()
informative['rolling_max'] = informative['close'].rolling(plateau_duration).max()
informative['plateau_amplitude'] = (informative['rolling_max'] - informative['rolling_min']) / informative[
'rolling_min']
informative['plateau'] = informative['plateau_amplitude'] < plateau_tolerance
# 2. Détection "fin de plateau"
# informative['plateau_end'] = (informative['plateau'] & ~informative['plateau'].shift(-1).fillna(False).astype(bool))
next_plateau = informative['plateau'].shift(-1)
next_plateau = next_plateau.fillna(False).astype(bool)
informative['plateau_end'] = informative['plateau'] & ~next_plateau
# 3. Enregistrer dernier plateau (min/max)
last_min = None
last_max = None
last_status = []
for i, row in informative.iterrows():
if row['plateau_end']:
last_min = row['rolling_min']
last_max = row['rolling_max']
if last_min is not None and last_max is not None:
if row['close'] > last_max:
breakout = "up"
distance = (row['close'] - last_max) / last_max
elif row['close'] < last_min:
breakout = "down"
distance = (last_min - row['close']) / last_min
else:
breakout = "inside"
distance = 0
else:
breakout = None
distance = None
last_status.append((breakout, distance))
informative['breakout_status'] = [s[0] for s in last_status]
informative['breakout_distance'] = [s[1] for s in last_status]
return informative