Files
Freqtrade/Zeus_8_3_2_B_4_2.py
2025-05-24 16:52:39 +02:00

1730 lines
82 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
from pandas import DataFrame
from typing import Optional, Union, Tuple
import logging
import configparser
from technical import pivots_points
# --------------------------------
# Add your lib to import here test git
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
import requests
from datetime import timezone, timedelta
from scipy.signal import savgol_filter
logger = logging.getLogger(__name__)
from tabulate import tabulate
def pprint_df(dframe):
print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False))
def normalize(df):
df = (df - df.min()) / (df.max() - df.min())
return df
class Zeus_8_3_2_B_4_2(IStrategy):
levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
startup_candle_count = 12 * 24 * 2
# ROI table:
minimal_roi = {
"0": 0.564,
"567": 0.273,
"2814": 0.12,
"7675": 0
}
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = True
# Buy hypers
timeframe = '5m'
max_open_trades = 5
max_amount = 40
# DCA config
position_adjustment_enable = True
plot_config = {
"main_plot": {
"sma5_1h": {
"color": "white"
},
"sma5_1d": {
"color": "blue"
},
"sma20": {
"color": "yellow"
},
"bb_lowerband": {
"color": "#da59a6"},
"bb_upperband": {
"color": "#da59a6",
},
"sma10": {
"color": "blue"
},
"min12_1d": {
"color": "red"
},
"max12_1d": {
"color": 'red'
},
"min50": {
"color": 'green'
},
"max50": {
"color": 'green'
}
},
"subplots": {
"Pct": {
"sma20_deriv1": {
'color': "green"
},
"down_pct": {
"color": "blue"
},
"down_pct_1h": {
"color": "red"
},
"down_pct_1d": {
"color": "red"
}
},
"Rsi": {
"rsi": {
"color": "pink"
},
"rsi_1h": {
"color": "red"
},
"rsi_1d": {
"color": "blue"
}
},
"Rsi_deriv": {
"rsi_deriv1_1h": {
"color": "red"
},
"rsi_deriv1_1d": {
"color": "blue"
},
},
"Down": {
"down_count_1h": {
"color": "green"
},
"up_count_1h": {
"color": "blue"
}
},
# "Diff": {
# "sma10_deriv1": {
# "color": "#74effc"
# }
# },
"smooth": {
'mid_smooth_deriv1_1d': {
"color": "blue"
},
'mid_smooth_1h_deriv1': {
"color": "red"
},
'mid_smooth_deriv2_1d': {
"color": "pink"
},
'mid_smooth_1h_deriv2': {
"color": "#da59a6"
}
}
}
}
columns_logged = False
pairs = {
pair: {
"first_buy": 0,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
"last_buy": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
"last_candle": {},
"last_trade": None,
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'last_palier_index': -1
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
# 20 20 40 60 100 160 260 420
# 50 50 100 300 500
# fibo = [1, 1, 2, 3, 5, 8, 13, 21]
# my fibo
# 50 50 50 100 100 150 200 250 350 450 600 1050
fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21]
baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84]
# Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21
# Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050
# Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200
# baisse 1 2 3 5 7 10 14 19 26 35 47 63 84
trades = list()
max_profit_pairs = {}
protection_percent_buy_lost = IntParameter(1, 10, default=5, space='protection')
protection_fibo = IntParameter(1, 10, default=2, space='protection')
sell_allow_decrease = DecimalParameter(0.005, 0.02, default=0.2, decimals=2, space='sell', optimize=True, load=True)
# Probabilité de hausse pour futur_percent_3h (en %):
# mid_smooth_1h_deriv1_bin B5 B4 B3 B2 B1 N0 H1 H2 H3 H4 H5
# sma24_deriv1_1h_bin
# B5 41.0 47.2 48.1 45.6 74.0 65.9 66.5 83.8 77.8 72.1 81.0
# B4 41.2 35.8 48.4 46.5 59.9 60.2 75.8 79.4 84.6 83.0 78.5
# B3 34.1 39.7 42.8 47.0 63.3 64.5 71.5 80.4 82.0 86.6 76.6
# B2 27.5 27.9 32.3 33.2 61.9 67.1 70.8 79.5 81.3 73.6 81.9
# B1 35.0 26.5 24.4 34.9 50.0 59.2 69.4 72.8 79.8 77.4 69.5
# N0 30.6 19.9 23.6 30.8 41.9 59.2 67.5 70.6 74.0 63.0 75.0
# H1 25.2 28.7 28.6 25.8 35.9 44.2 60.1 68.8 67.7 69.6 80.9
# H2 29.8 20.8 23.9 30.4 34.4 37.5 52.7 66.1 69.8 67.5 62.9
# H3 25.7 29.4 22.7 29.8 37.7 47.1 59.9 68.5 66.5 68.6 66.4
# H4 30.6 27.5 25.1 22.6 30.8 34.1 50.9 59.8 57.0 68.6 63.7
# H5 14.8 21.6 22.2 35.3 19.3 31.6 38.3 59.6 65.2 56.8 59.6
labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5']
index_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5']
# Récupération des labels ordonnés
ordered_labels = ['B5', 'B4', 'B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3', 'H4', 'H5']
label_to_index = {label: i for i, label in enumerate(ordered_labels)}
# Données sous forme de dictionnaire
# Bornes des quantiles pour
mid_smooth_24_deriv1_bins = [-37.4852, -0.7541, -0.4233, -0.2510, -0.1338, -0.0389, 0.0496, 0.1464, 0.2660, 0.4384, 0.7697, 48.2985]
sma144_deriv1_bins = [-0.2592, -0.0166, -0.0091, -0.0051, -0.0025, -0.0005, 0.0012, 0.0034, 0.0062, 0.0105, 0.0183, 0.2436]
smooth24_sma144_deriv1_matrice = {
'B5': [8.2, 4.1, 3.1, 3.4, 3.5, 3.0, 2.9, 2.8, 2.5, 3.0, 4.1],
'B4': [24.9, 13.5, 11.8, 11.0, 9.0, 9.3, 9.1, 8.9, 8.1, 7.8, 11.4],
'B3': [39.8, 24.7, 20.4, 18.8, 17.4, 16.0, 16.2, 14.7, 15.4, 15.5, 15.9],
'B2': [54.8, 40.6, 32.7, 28.3, 25.9, 24.3, 23.1, 24.0, 23.4, 24.2, 21.1],
'B1': [65.1, 52.9, 46.6, 44.7, 38.8, 37.7, 35.4, 33.6, 32.2, 33.1, 27.4],
'N0': [73.1, 62.9, 61.1, 59.0, 56.1, 52.4, 49.5, 48.5, 42.7, 39.9, 35.3],
'H1': [79.7, 72.5, 73.1, 72.6, 71.6, 69.8, 66.9, 63.8, 58.0, 53.2, 41.9],
'H2': [81.7, 79.8, 79.6, 80.8, 79.3, 80.1, 78.1, 76.7, 72.5, 65.7, 52.8],
'H3': [86.1, 87.7, 87.4, 87.8, 87.2, 86.5, 84.5, 84.4, 82.7, 78.8, 65.4],
'H4': [92.6, 93.4, 94.0, 93.4, 94.3, 94.0, 93.8, 93.7, 92.7, 89.6, 79.9],
'H5': [97.1, 97.5, 97.9, 98.2, 97.5, 98.2, 98.1, 98.2, 97.7, 97.4, 93.5],
}
smooth24_sma144_deriv1_matrice_df = pd.DataFrame(smooth24_sma144_deriv1_matrice, index=index_labels)
# Extraction de la matrice numérique
smooth24_sma144_deriv1_numeric_matrice = smooth24_sma144_deriv1_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values
# Bornes des quantiles pour
mid_smooth_deriv2_24_bins = [-10.2968, -0.2061, -0.0996, -0.0559, -0.0292, -0.0093, 0.0083, 0.0281, 0.0550, 0.0999, 0.2072, 10.2252]
# =========================================================================
# variables pour probabilité 144 bougies
mid_smooth_1h_bins = [-2.0622, -0.1618, -0.0717, -0.0353, -0.0135, 0.0, 0.0085, 0.0276, 0.0521, 0.0923, 0.1742, 2.3286]
sma24_deriv1_1h_bins = [-0.84253877, -0.13177195, -0.07485074, -0.04293497, -0.02033502, -0.00215711,
0.01411933, 0.03308264, 0.05661652, 0.09362708, 0.14898214, 0.50579505]
smooth_smadiff_matrice = {
"B5": [41.0, 41.2, 34.1, 27.5, 35.0, 30.6, 25.2, 29.8, 25.7, 30.6, 14.8],
"B4": [47.2, 35.8, 39.7, 27.9, 26.5, 19.9, 28.7, 20.8, 29.4, 27.5, 21.6],
"B3": [48.1, 48.4, 42.8, 32.3, 24.4, 23.6, 28.6, 23.9, 22.7, 25.1, 22.2],
"B2": [45.6, 46.5, 47.0, 33.2, 34.9, 30.8, 25.8, 30.4, 29.8, 22.6, 35.3],
"B1": [74.0, 59.9, 63.3, 61.9, 50.0, 41.9, 35.9, 34.4, 37.7, 30.8, 19.3],
"N0": [65.9, 60.2, 64.5, 67.1, 59.2, 59.2, 44.2, 37.5, 47.1, 34.1, 31.6],
"H1": [66.5, 75.8, 71.5, 70.8, 69.4, 67.5, 60.1, 52.7, 59.9, 50.9, 38.3],
"H2": [83.8, 79.4, 80.4, 79.5, 72.8, 70.6, 68.8, 66.1, 68.5, 59.8, 59.6],
"H3": [77.8, 84.6, 82.0, 81.3, 79.8, 74.0, 67.7, 69.8, 66.5, 57.0, 65.2],
"H4": [72.1, 83.0, 86.6, 73.6, 77.4, 63.0, 69.6, 67.5, 68.6, 68.6, 56.8],
"H5": [81.0, 78.5, 76.6, 81.9, 69.5, 75.0, 80.9, 62.9, 66.4, 63.7, 59.6]
}
smooth_smadiff_matrice_df = pd.DataFrame(smooth_smadiff_matrice, index=index_labels)
# Extraction de la matrice numérique
smooth_smadiff_numeric_matrice = smooth_smadiff_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values
# =========================================================================
# variables pour probabilité
smooth_pct_max_hour_matrice = {
'B5': [43.5, 52.7, 62.3, 65.5, 86.9, 63.1, 81.5, 86.7, 90.2, 90.1, 93.0],
'B4': [34.9, 46.3, 53.6, 60.4, 75.8, 83.3, 81.5, 83.0, 86.4, 86.9, 91.1],
'B3': [20.5, 35.4, 43.7, 54.5, 69.7, 71.6, 80.4, 84.7, 86.7, 84.9, 85.9],
'B2': [11.5, 25.4, 36.4, 47.9, 62.3, 65.7, 76.5, 82.0, 81.8, 82.8, 77.7],
'B1': [3.6, 14.9, 26.8, 41.1, 55.6, 71.4, 74.3, 79.8, 80.8, 82.3, 75.1],
'N0': [0.0, 6.9, 18.3, 32.0, 47.2, 62.1, 69.1, 74.8, 78.3, 76.6, 71.6],
'H1': [0.7, 3.8, 9.4, 24.2, 40.6, 59.7, 67.8, 70.9, 73.4, 72.1, 70.0],
'H2': [0.0, 0.6, 6.5, 13.6, 33.6, 51.7, 64.9, 70.2, 68.4, 67.8, 65.8],
'H3': [1.4, 0.6, 2.6, 6.6, 23.3, 50.2, 56.2, 63.6, 65.7, 64.5, 64.7],
'H4': [1.6, 0.3, 3.0, 3.2, 11.4, 32.7, 44.0, 54.9, 61.7, 60.6, 63.6],
'H5': [1.8, 2.6, 0.6, 1.1, 9.7, 12.9, 26.2, 44.5, 52.6, 54.5, 56.2],
}
smooth_pct_max_hour_matrice_df = pd.DataFrame(smooth_pct_max_hour_matrice, index=index_labels)
# Extraction de la matrice numérique
smooth_pct_max_hour_numeric_matrice = smooth_pct_max_hour_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values
# =========================================================================
# variables pour probabilité 144 bougies
# Données sous forme de dictionnaire
smooth_sma_24_diff_matrice = {
"B5":[40.3, 52.1, 60.2, 68.6, 86.3, 76.5, 75.1, 83.5, 88.7, 96.3, 91.6],
"B4":[26.6, 39.4, 48.1, 57.0, 76.7, 82.4, 79.6, 82.4, 91.8, 86.6, 87.8],
"B3":[21.5, 27.7, 42.7, 53.2, 70.9, 76.6, 80.8, 79.4, 88.3, 88.0, 87.8],
"B2":[15.1, 20.8, 32.9, 46.9, 59.1, 79.6, 82.5, 79.6, 80.8, 87.0, 85.5],
"B1":[15.7, 15.4, 21.9, 29.4, 48.3, 66.6, 76.4, 77.8, 80.8, 83.5, 81.4],
"N0":[15.0, 10.5, 20.1, 24.5, 36.9, 59.9, 68.8, 74.1, 77.7, 83.0, 75.7],
"H1":[14.8, 10.7, 15.1, 21.0, 30.1, 47.3, 59.2, 70.4, 76.1, 82.7, 82.6],
"H2":[7.9, 8.6, 13.6, 20.6, 27.0, 39.5, 55.2, 68.9, 69.0, 78.4, 83.4],
"H3":[9.2, 6.2, 12.6, 21.7, 23.6, 33.1, 42.3, 57.8, 66.0, 71.9, 81.9],
"H4":[4.8, 13.1, 16.3, 14.5, 19.5, 26.4, 35.6, 49.2, 63.2, 68.2, 71.6],
"H5":[17.9, 25.7, 20.8, 17.8, 8.7, 18.5, 32.3, 37.7, 49.3, 59.8, 61.7]
}
smooth_sma_24_diff_matrice_df = pd.DataFrame(smooth_smadiff_matrice, index=index_labels)
# Extraction de la matrice numérique
smooth_sma_24_diff_numeric_matrice = smooth_sma_24_diff_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values
# Bornes des quantiles pour
mid_smooth_1h_deriv1 = [-11.5091, -0.4887, -0.1902, -0.0823, -0.0281, -0.0008, 0.0110, 0.0439, 0.1066, 0.2349, 0.5440, 14.7943]
# Bornes des quantiles pour
mid_smooth_1h_deriv2 = [-6.2109, -0.2093, -0.0900, -0.0416, -0.0171, -0.0035, 0.0033, 0.0168, 0.0413, 0.0904, 0.2099, 6.2109]
# =========================================================================
# variables pour probabilité jour
# Bornes des quantiles pour
mid_smooth_1h_deriv1_1d_bins = [-11.5091, -0.4887, -0.1902, -0.0823, -0.0281, -0.0008, 0.0110, 0.0439, 0.1066, 0.2349, 0.5440, 14.7943]
# Bornes des quantiles pour
sma24_deriv1_1h_1d_bins = [-2.1101, -0.1413, -0.0768, -0.0433, -0.0196, -0.0028, 0.0120, 0.0304, 0.0560, 0.0933, 0.1568, 0.7793]
smooth_1d_sma_2_diff_1d_matrice = {
'B5': [42.5, 47.8, 52.7, 48.5, 54.2, 64.6, 70.8, 69.2, 72.3, 71.2, 79.9],
'B4': [34.1, 43.5, 45.7, 53.7, 52.6, 67.3, 63.9, 70.8, 73.5, 67.9, 82.9],
'B3': [33.7, 42.7, 45.8, 49.6, 49.0, 57.8, 64.7, 68.7, 70.7, 72.6, 87.1],
'B2': [30.0, 36.6, 40.5, 42.3, 51.2, 62.0, 64.4, 65.2, 69.8, 74.3, 84.9],
'B1': [21.4, 29.8, 33.6, 39.9, 49.4, 56.1, 59.9, 63.9, 71.0, 72.8, 79.6],
'N0': [19.8, 30.4, 34.5, 41.5, 42.2, 48.1, 61.7, 64.5, 73.7, 69.3, 79.4],
'H1': [22.7, 27.0, 36.9, 34.8, 46.3, 50.2, 58.9, 63.1, 65.8, 66.5, 80.0],
'H2': [23.1, 34.3, 32.2, 31.0, 38.8, 54.3, 53.6, 55.1, 60.3, 63.3, 77.4],
'H3': [17.0, 32.6, 37.4, 31.0, 35.1, 36.7, 45.2, 53.0, 55.4, 58.6, 71.8],
'H4': [22.7, 31.9, 28.0, 35.8, 36.3, 46.9, 53.9, 53.8, 58.8, 58.0, 67.6],
'H5': [18.8, 27.0, 32.1, 36.0, 41.9, 48.1, 49.8, 53.6, 57.2, 62.2, 65.2],
}
smooth_1d_sma_2_diff_1d_matrice_df = pd.DataFrame(smooth_smadiff_matrice, index=index_labels)
# Extraction de la matrice numérique
smooth_1d_sma_2_diff_1d_numeric_matrice = smooth_1d_sma_2_diff_1d_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values
paliers = {}
# =========================================================================
# Parameters hyperopt
# buy_mid_smooth_3_deriv1 = DecimalParameter(-0.1, 0.1, decimals=2, default=-0.06, space='buy')
# buy_mid_smooth_24_deriv1 = DecimalParameter(-0.6, 0, decimals=2, default=-0.03, space='buy')
buy_horizon_predict_1h = IntParameter(1, 12, default=2, space='buy')
buy_level_predict_1h = IntParameter(2, 5, default=4, space='buy')
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
# val = self.getProbaHausse144(last_candle)
# allow_to_buy = True #(not self.stop_all) #& (not self.all_down)
allow_to_buy = not self.pairs[pair]['stop'] #and val > self.buy_val.value #not last_candle['tendency'] in ('B-', 'B--') # (rate <= float(limit)) | (entry_tag == 'force_entry')
if allow_to_buy:
self.trades = list()
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_palier_index'] = -1
dispo = round(self.wallets.get_available_stake_amount())
self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
allow_to_sell = (last_candle['percent'] < 0)
minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0))
if allow_to_sell:
self.trades = list()
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries #self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['max_profit'] = 0
self.trades = list()
dispo= round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2)
)
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['last_palier_index'] = -1
return (allow_to_sell) | (exit_reason == 'force_exit')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
current_candle = dataframe.iloc[-1].squeeze()
adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle)
# print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}")
# Use default stake amount.
return adjusted_stake_amount
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_1h = dataframe.iloc[-13].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
before_last_candle_2 = dataframe.iloc[-3].squeeze()
expected_profit = self.expectedProfit(pair, last_candle)
# print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
max_touch_before = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['haclose'], self.pairs[pair]['last_max'])
count_of_buys = trade.nr_of_successful_entries
baisse = self.pairs[pair]['max_profit'] - current_profit
mx = self.pairs[pair]['max_profit'] / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = current_profit
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], current_profit)
# if (last_candle['mid_smooth_deriv1'] >= 0):
# return None
# if (last_candle['tendency'] in ('H++', 'H+')) and (last_candle['rsi'] < 80):
# return None
#
# if (last_candle['sma20_deriv1'] < 0 and before_last_candle['sma20_deriv1'] >= 0) and (current_profit > expected_profit):
# return 'Drv_' + str(count_of_buys)
if ((before_last_candle_2['mid_smooth_3_deriv1'] <= before_last_candle['mid_smooth_3_deriv1'])
& (before_last_candle['mid_smooth_3_deriv1'] >= last_candle['mid_smooth_3_deriv1'])) \
and (current_profit > expected_profit):
return 'Drv_' + str(count_of_buys)
# if (baisse > mx) & (current_profit > expected_profit):
# self.trades = list()
# return 'mx_' + str(count_of_buys)
# if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit):
# self.trades = list()
# return 'pft_' + str(count_of_buys)
self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch'])
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
from typing import List
def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float:
if pct <= thresholds[0]:
return factors[0]
if pct >= thresholds[-1]:
return factors[-1]
for i in range(1, len(thresholds)):
if pct <= thresholds[i]:
# interpolation linéaire entre thresholds[i-1] et thresholds[i]
return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / (
thresholds[i] - thresholds[i - 1])
# Juste au cas où (devrait jamais arriver)
return factors[-1]
def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30,
start_factor: float = 1.0, end_factor: float = 2.0) -> float:
if pct <= start_pct:
return start_factor
if pct >= end_pct:
return end_factor
# interpolation linéaire
return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct)
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt':
return
if self.columns_logged % 30 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}|{'Buys':>4}| {'Stake':>5} |"
f"Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d| drv2 |drv_2h|drv_2d|val144|val1h |"
)
self.printLineLog()
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
# int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = round((last_candle['haclose'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
max_touch = '' #round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1)
# if trade_type is not None:
# if np.isnan(last_candle['rsi_1d']):
# string = ' '
# else:
# string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_deriv1_1d']))
# trade_type = trade_type \
# + " " + string \
# + " " + str(int(last_candle['rsi_1h'])) \
# + " " + str(int(last_candle['rsi_deriv1_1h']))
# val144 = self.getProbaHausse144(last_candle)
# val1h = self.getProbaHausse1h(last_candle)
self.printLog(
f"| {date:<16} | {action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"| {profit or '-':>8} | {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {int(self.pairs[pair]['last_max']) or '-':>7} |{buys or '-':>2}-{self.pairs[pair]['last_palier_index'] or '-':>2}|{stake or '-':>7}"
f"|{last_candle['tendency_12'] or '-':>3}|" #{last_candle['tendency_1h'] or '-':>3}|{last_candle['tendency_1d'] or '-':>3}"
# f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_1h_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|"
# f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_1h_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|"
# f"{round(val144, 1) or '-' :>6}|{round(val1h, 1) or '-':>6}|"
f"{round(last_candle['sma20_deriv1'], 4) or '-' :>6}|{round(last_candle['sma5_deriv1_1d'], 4) or '-' :>6}"
)
def printLineLog(self):
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|"
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 10}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}+{'-' * 4}+{'-' * 7}+"
f"{'-' * 3}"
#"+{'-' * 3}+{'-' * 3}
# f"+{'-' * 6}+{'-' * 6}+{'-' * 6}+{'-' * 6}+{'-' * 6}+{'-' * 6}+"
)
def printLog(self, str):
if not self.dp.runmode.value in ('backtest', 'hyperopt'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def add_tendency_column(self, dataframe: pd.DataFrame, suffixe='') -> pd.DataFrame:
def tag_by_derivatives(row):
d1 = row[f"mid_smooth{suffixe}_deriv1"]
d2 = row[f"mid_smooth{suffixe}_deriv2"]
d1_lim_inf = -0.01
d1_lim_sup = 0.01
if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup:
return 'P' # Palier
if d1 == 0.0:
return 'DH' if d2 > 0 else 'DB' #Depart Hausse / Départ Baisse
if d1 > d1_lim_sup:
return 'H++' if d2 > 0 else 'H+' #Acceleration Hausse / Ralentissement Hausse
if d1 < d1_lim_inf:
return 'B--' if d2 < 0 else 'B-' # Accéleration Baisse / Ralentissement Baisse
return 'Mid'
dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1)
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=200)
dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50)
dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200)
dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200)
dataframe['max50'] = talib.MAX(dataframe['close'], timeperiod=50)
dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close']
dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5)
dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10)
self.calculeDerivees(dataframe, 'sma10')
dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20)
self.calculeDerivees(dataframe, 'sma20')
dataframe['sma144'] = talib.SMA(dataframe, timeperiod=144)
self.calculeDerivees(dataframe, 'sma144')
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3)
dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5)
dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12)
dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3")
dataframe["mid_re_smooth_3"] = self.conditional_smoothing(dataframe['mid_smooth_3'].dropna(), threshold=0.0005).dropna()
self.calculeDerivees(dataframe, "mid_re_smooth_3")
dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12")
dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", factor_1=1000, factor_2=10)
# print(metadata['pair'])
dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14)
self.calculeDerivees(dataframe, 'rsi')
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = (
(dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_upperband"]
)
# Normalization
# Compter les baisses consécutives
self.calculateDownAndUp(dataframe, limit=0.0001)
# dataframe = self.calculateRegression(dataframe, column='mid_smooth', window=24, degree=4, future_offset=12)
# dataframe = self.calculateRegression(dataframe, column='mid_smooth_24', window=24, degree=4, future_offset=12)
################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
heikinashi = qtpylib.heikinashi(informative)
informative['haopen'] = heikinashi['open']
informative['haclose'] = heikinashi['close']
informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose']
# informative = self.calculateDerivation(informative, window=12)
# informative = self.apply_regression_derivatives(informative, column='mid', window=5, degree=4)
# informative['volatility'] = talib.STDDEV(informative['close'], timeperiod=14) / informative['close']
# informative['atr'] = (talib.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / informative['close']
informative['rsi'] = talib.RSI(informative['close']) #, timeperiod=7)
informative['max12'] = talib.MAX(informative['close'], timeperiod=12)
informative['min12'] = talib.MIN(informative['close'], timeperiod=12)
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma24'] = talib.SMA(informative, timeperiod=24)
self.calculeDerivees(informative, 'sma24')
# self.calculateDownAndUp(informative, limit=0.0012)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
# informative = self.calculateDerivation(informative, window=5, factor_1=10000, factor_2=1000)
# informative['volatility'] = talib.STDDEV(informative['close'], timeperiod=14) / informative['close']
# informative['atr'] = (talib.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / informative['close']
# informative = self.apply_regression_derivatives(informative, column='mid', window=5, degree=4)
informative['max12'] = talib.MAX(informative['close'], timeperiod=12)
informative['min12'] = talib.MIN(informative['close'], timeperiod=12)
informative['max3'] = talib.MAX(informative['close'], timeperiod=3)
informative['min3'] = talib.MIN(informative['close'], timeperiod=3)
# informative['rsi'] = talib.RSI(informative['close']) #, timeperiod=7)
# self.calculeDerivees(informative, 'rsi')
#
informative['sma5'] = talib.SMA(informative, timeperiod=5)
self.calculeDerivees(informative, 'sma5', factor_1=10, factor_2=1)
# informative['close_smooth'] = self.conditional_smoothing(informative['mid'].dropna(), threshold=0.0015).dropna()
# informative['smooth'], informative['deriv1'], informative['deriv2'] = self.smooth_and_derivatives(informative['close_smooth'])
# informative['deriv1'] = 100 * informative['deriv1'] / informative['mid']
# informative['deriv2'] = 1000 * informative['deriv2'] / informative['mid']
# poly_func, x_future, y_future, count = self.polynomial_forecast(informative['sma5_deriv1_1d'], window=24, degree=4)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
# dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01
# dataframe['limit'] = dataframe['close']
count_buys = 0
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
filled_buys = trade.select_filled_orders('buy')
count = 0
amount = 0
for buy in filled_buys:
if count == 0:
dataframe['first_price'] = buy.price
# dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
count = count + 1
amount += buy.price * buy.filled
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
count_buys = count
# dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100)
# dataframe['amount'] = amount
# dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24'])
# ===============================
# lissage des valeurs horaires
dataframe['mid_smooth_1h'] = dataframe['mid'].rolling(window=6).mean()
dataframe["mid_smooth_1h_deriv1"] = 100 * dataframe["mid_smooth_1h"].diff() / dataframe['mid_smooth_1h']
dataframe["mid_smooth_1h_deriv2"] = 10 * dataframe["mid_smooth_1h_deriv1"].diff()
# dataframe['close_smooth_1h'] = self.conditional_smoothing(dataframe['mid'].rolling(window=3).mean().dropna(), threshold=0.0005)
# dataframe['smooth_1h'], dataframe['deriv1_1h'], dataframe['deriv2_1h'] = self.smooth_and_derivatives(dataframe['close_smooth_1h'])
# dataframe['deriv1_1h'] = 100 * dataframe['deriv1_1h'] / dataframe['mid_smooth_1h']
# dataframe['deriv2_1h'] = 1000 * dataframe['deriv2_1h'] / dataframe['mid_smooth_1h']
# dataframe['sma5_1h'] = dataframe['sma5_1h'].rolling(window=horizon_h).mean()
# dataframe['sma5_deriv1_1h'] = dataframe['sma5_deriv1_1h'].rolling(window=horizon_h).mean()
# dataframe['sma24_1h'] = dataframe['sma24_1h'].rolling(window=horizon_h).mean()
# dataframe['sma24_deriv1_1h'] = dataframe['sma24_deriv1_1h'].rolling(window=horizon_h).mean()
# dataframe = self.calculateRegression(dataframe, column='mid_smooth_1h', window=horizon_h * 12, degree=4, future_offset=24)
# Suppose que df['close'] est ton prix de clôture
# dataframe['close_smooth_24'] = self.conditional_smoothing(dataframe['mid'].rolling(24).mean().dropna(), threshold=0.0015)
# dataframe['smooth_24'], dataframe['smooth_24_deriv1'], dataframe['smooth_24_deriv2'] = self.smooth_and_derivatives(dataframe['close_smooth_24'])
# dataframe['smooth_24_deriv1'] = 100 * dataframe['smooth_24_deriv1'] / dataframe['mid_smooth_24']
# dataframe['smooth_24_deriv2'] = 100 * dataframe['smooth_24_deriv2'] / dataframe['mid_smooth_24']
dataframe['close_smooth'] = self.conditional_smoothing(dataframe['mid'].rolling(3).mean().dropna(), threshold=0.001)
dataframe['smooth'], dataframe['deriv1'], dataframe['deriv2'] = self.smooth_and_derivatives(dataframe['close_smooth'])
dataframe['deriv1'] = 100 * dataframe['deriv1'] / dataframe['mid']
dataframe['deriv2'] = 100 * dataframe['deriv2'] / dataframe['mid']
# ===============================
# Lissage des valeurs Journalières
# horizon_d = 24 * 5
# dataframe['mid_smooth_1d'] = dataframe['mid_smooth_1d'].rolling(window=horizon_d * 5).mean()
# dataframe["mid_smooth_deriv1_1d"] = dataframe["mid_smooth_1d"].rolling(horizon_d).mean().diff() / horizon_d
# dataframe["mid_smooth_deriv2_1d"] = horizon_d * dataframe["mid_smooth_deriv1_1d"].rolling(horizon_d).mean().diff()
#
# dataframe['sma5_1d'] = dataframe['sma5_1d'].rolling(window=horizon_d * 5).mean()
# dataframe['sma5_deriv1_1d'] = dataframe['sma5_deriv1_1d'].rolling(window=horizon_d).mean()
# dataframe['sma24_1d'] = dataframe['sma24_1d'].rolling(window=horizon_d).mean()
# dataframe['sma24_deriv1_1d'] = dataframe['sma24_deriv1_1d'].rolling(window=horizon_d).mean()
# dataframe = self.calculateRegression(dataframe, column='mid_smooth_1d', window=24, degree=4, future_offset=12)
# dataframe['percent_with_previous_day'] = 100 * (dataframe['close'] - dataframe['close_1d']) / dataframe['close']
# dataframe['percent_with_max_hour'] = 100 * (dataframe['close'] - dataframe['max12_1h']) / dataframe['close']
#
# horizon_h = 24 * 5
# dataframe['futur_percent_1h'] = 100 * ((dataframe['mid_smooth_1h'].shift(-12) - dataframe['mid_smooth_1h']) / dataframe['mid_smooth_1h']).rolling(horizon_h).mean()
# dataframe['futur_percent_3h'] = 100 * ((dataframe['mid_smooth_1h'].shift(-36) - dataframe['mid_smooth_1h']) / dataframe['mid_smooth_1h']).rolling(horizon_h).mean()
# dataframe['futur_percent_5h'] = 100 * ((dataframe['mid_smooth_1h'].shift(-60) - dataframe['mid_smooth_1h']) / dataframe['mid_smooth_1h']).rolling(horizon_h).mean()
# dataframe['futur_percent_12h'] = 100 * ((dataframe['mid_smooth_1h'].shift(-144) - dataframe['mid_smooth_1h']) / dataframe['mid_smooth_1h']).rolling(horizon_h).mean()
#
# dataframe['futur_percent_1d'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close']
# dataframe['futur_percent_3d'] = 100 * (dataframe['close'].shift(-3) - dataframe['close']) / dataframe['close']
#
# self.calculateProbabilite2Index(dataframe, ['futur_percent_1d'], 'sma24_deriv1_1h', 'sma5_1d')
return dataframe
def calculeDerivees(self, dataframe, indic, factor_1=100, factor_2=10):
dataframe[f"{indic}_deriv1"] = factor_1 * dataframe[f"{indic}"].diff() / dataframe[f"{indic}"]
dataframe[f"{indic}_deriv2"] = factor_2 * dataframe[f"{indic}_deriv1"].diff()
def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['hapercent'] <= limit
dataframe['up'] = dataframe['hapercent'] >= limit
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
def calculateDerivation(self, dataframe, window=12, suffixe='', factor_1=100, factor_2=10):
dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
# 1. Calcul du lissage par moyenne mobile médiane
dataframe[f"mid_smooth{suffixe}"] = dataframe['haclose'].rolling(window=window).mean()
# 2. Dérivée première = différence entre deux bougies successives
dataframe[f"mid_smooth{suffixe}_deriv1"] = round(factor_1 * dataframe[f"mid_smooth{suffixe}"].rolling(window=3).mean().diff() / dataframe[f"mid_smooth{suffixe}"], 4)
# 3. Dérivée seconde = différence de la dérivée première
dataframe[f"mid_smooth{suffixe}_deriv2"] = round(factor_2 * dataframe[f"mid_smooth{suffixe}_deriv1"].rolling(window=3).mean().diff(), 4)
dataframe = self.add_tendency_column(dataframe, suffixe)
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
self.trades = Trade.get_open_trades()
return self.trades
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
# self.getOpenTrades()
expected_profit = self.expectedProfit(pair, dataframe.iloc[-1])
# self.getBinanceOrderBook(pair, dataframe)
last_candle = dataframe.iloc[-1].squeeze()
# dataframe.loc[
# (
# (dataframe['percent'] > 0)
# & (dataframe['mid_smooth_deriv1'] >= dataframe['mid_smooth_deriv1'].shift(1))
# ), ['enter_long', 'enter_tag']] = (1, 'down')
dataframe.loc[
(
# (dataframe['deriv2_1h'].shift(2) >= dataframe['deriv2_1h'].shift(1))
# & (dataframe['deriv2_1h'].shift(1) <= dataframe['deriv2_1h'])
# (dataframe['deriv1_1h'] >= -0.01)
# & (dataframe['deriv2_1h'] >= -0.00)
(dataframe['mid_smooth_3_deriv1'].shift(2) >= dataframe['mid_smooth_3_deriv1'].shift(1))
& (dataframe['mid_smooth_3_deriv1'].shift(1) <= dataframe['mid_smooth_3_deriv1'])
#
#
# (dataframe['mid_smooth_1h_deriv1'] >= 0)
# & (dataframe['mid_smooth_1h_deriv1'] >= 0)
# & (dataframe['mid_smooth_1h_deriv1'].shift(1) <= 0)
# & (dataframe['mid_smooth_1h_deriv1'] >= dataframe['mid_smooth_1h_deriv1'].shift(1))
), ['enter_long', 'enter_tag']] = (1, 'smth')
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
self.paliers = self.get_dca_stakes()
if self.dp.runmode.value in ('backtest'):
today = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
dataframe.to_feather(f"user_data/data/binance/{today}-{metadata['pair'].replace('/', '_')}_df.feather")
#
# df = dataframe
#
# # Colonnes à traiter
# # futur_cols = ['futur_percent_1h', 'futur_percent_3h', 'futur_percent_5h', 'futur_percent_12h']
# futur_cols = ['futur_percent_1h']
#
# # Tranches équitables par quantiles
#
# indic_1 = 'mid_smooth_24_deriv1'
# indic_2 = 'sma144_deriv1'
# #indic_2 = 'percent_with_max_hour'
# # indic_1 = 'mid_smooth_1h_deriv1'
# # indic_2 = 'sma5_deriv1_1d'
#
# self.calculateProbabilite2Index(df, futur_cols, indic_1, indic_2)
return dataframe
def calculateProbabilite2Index(self, df, futur_cols, indic_1, indic_2):
# # Définition des tranches pour les dérivées
# bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf]
# labels = ['forte baisse', 'légère baisse', 'neutre', 'légère hausse', 'forte hausse']
#
# # Ajout des colonnes bin (catégorisation)
# df[f"{indic_1}_bin"] = pd.cut(df['mid_smooth_1h_deriv1'], bins=bins_deriv, labels=labels)
# df[f"{indic_2}_bin"] = pd.cut(df['mid_smooth_deriv1_1d'], bins=bins_deriv, labels=labels)
#
# # Colonnes de prix futur à analyser
# futur_cols = ['futur_percent_1h', 'futur_percent_2h', 'futur_percent_3h', 'futur_percent_4h', 'futur_percent_5h']
#
# # Calcul des moyennes et des effectifs
# grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"])[futur_cols].agg(['mean', 'count'])
#
# pd.set_option('display.width', 200) # largeur max affichage
# pd.set_option('display.max_columns', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 300) # largeur max affichage
# nettoyage
# series = df[f"{indic_2}"].dropna()
# unique_vals = df[f"{indic_2}"].nunique()
# print(unique_vals)
# print(df[f"{indic_2}"])
n = len(self.labels)
df[f"{indic_1}_bin"], bins_1h = pd.qcut(df[f"{indic_1}"], q=n, labels=self.labels, retbins=True,
duplicates='drop')
df[f"{indic_2}_bin"], bins_1d = pd.qcut(df[f"{indic_2}"], q=n, labels=self.labels, retbins=True,
duplicates='drop')
# Affichage formaté pour code Python
print(f"Bornes des quantiles pour {indic_1} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]")
print(f"Bornes des quantiles pour {indic_2} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]")
# Agrégation
grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[futur_cols].agg(['mean', 'count'])
# Affichage
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print(grouped.round(4))
# Ajout des probabilités de hausse
for col in futur_cols:
df[f"{col}_is_up"] = df[col] > 0
# Calcul de la proba de hausse
proba_up = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[f"{col}_is_up"].mean().unstack()
print(f"\nProbabilité de hausse pour {col} (en %):")
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
print((proba_up * 100).round(1))
# Affichage formaté des valeurs comme tableau Python
with pd.option_context('display.max_rows', None, 'display.max_columns', None):
df_formatted = (proba_up * 100).round(1)
print("data = {")
for index, row in df_formatted.iterrows():
row_values = ", ".join([f"{val:.1f}" for val in row])
print(f"'{index}': [{row_values}], ")
print("}")
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# (dataframe['mid_smooth_deriv1'] == 0)
# & (dataframe['mid_smooth_deriv1'].shift(1) > 0)
# ), ['sell', 'exit_long']] = (1, 'sell_sma5_pct_1h')
return dataframe
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
print("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 50): # or trade.stake_amount >= max_stake:
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
last_candle_2 = dataframe.iloc[-3].squeeze()
last_candle_3 = dataframe.iloc[-4].squeeze()
last_candle_previous_1h = dataframe.iloc[-13].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
if (len(dataframe) < 1):
print("skip dataframe")
return None
pair = trade.pair
if self.dp.runmode.value in ('dry_run'):
if pair not in ('BTC/USDT', 'BTC/USDC', 'XRP/USDT', 'XRP/USDC', 'ETH/USDT', 'ETH/USDC'):
# print(f"skip pair {pair}")
return None
else:
if pair not in ('BTC/USDT', 'BTC/USDC'):
# print(f"skip pair {pair}")
return None
# # déclenche un achat si bougie rouge importante
# stake_amount = self.config.get('stake_amount', 100)
# stake_amount = min(stake_amount, self.wallets.get_available_stake_amount())
# current_time = current_time.astimezone(timezone.utc)
# seconds_since_filled = (current_time - trade.date_last_filled_utc).total_seconds()
# pct = (last_candle['close'] - last_candle['open']) / (last_candle['open']) * 100
# if (
# stake_amount
# and pct <= - 1.10 #self.red_candle_pct
# and min_stake < stake_amount < max_stake
# and seconds_since_filled > (60 * 5)
# # and (last_candle["sma24_deriv1_1h"] > - 0.02)
# # and seconds_since_filled > (1 * 3600)
# # and count_of_entries < 10
# ):
# trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="Adjust 1",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type=trade_type,
# profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
# buys=trade.nr_of_successful_entries + 1,
# stake=round(stake_amount, 2)
# )
#
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
# return stake_amount
#
# # déclenche un achat en conditions d'achat standard
# if (
# stake_amount
# and last_candle['close'] < last_candle['sma20']
# and last_candle['close'] < last_candle['open']
# and min_stake < stake_amount < max_stake
# and (last_candle["sma24_deriv1_1h"] > - 0.02)
# and seconds_since_filled > 23 * 3600 #self.staking_delay * 3600
# ):
# stake_amount = stake_amount * seconds_since_filled / (23 * 3600)
# trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="Adjust 2",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type=trade_type,
# profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
# buys=trade.nr_of_successful_entries + 1,
# stake=round(stake_amount, 2)
# )
#
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
# return stake_amount
#
# return None
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pct_first = 0
if self.pairs[pair]['first_buy']:
pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
pct = 0.012
if count_of_buys == 1:
pct_max = current_profit
else:
if self.pairs[trade.pair]['last_buy']:
pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4)
else:
pct_max = - pct
lim = - pct - (count_of_buys * 0.001)
# index = self.get_palier_index(pct_first)
# if index is None:
# return None
# index = index -1
# lim, stake_amount = self.paliers[index] #- pct - (count_of_buys * 0.001)
# self.get_active_stake()
# val144 = self.getProbaHausse144(last_candle)
# val1h = self.getProbaHausse1h(last_candle)
# val = self.getProbaHausse144(last_candle)
# buy = False
# previous = 0
# # current_profit=-0.001998 count_of_buys=1 pct_first=0.000 pct_palier=-0.629 pct_max=-0.002 lim=0.000
#
# for pct_palier, stake_amount in self.paliers:
# if abs(pct_palier) > abs(pct_first):
# lim = pct_palier
# break
# previous = pct_palier
# print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_first={pct_first:.3f} pct_palier={pct_palier:.3f} pct_max={pct_max:.3f} lim={lim:.3f} ")
# if (days_since_open > count_of_buys) & (0 < count_of_buys <= max_buys) & (current_rate <= limit) & (last_candle['enter_long'] == 1):
# ['mid_smooth_1h_deriv1']
# sans cdt° ==>Avg. stake amount 276.516 USDT │ Total trade volume 175760.342 USDT 315 │ 1.17 │ 1204.156 │ 60.21│ 1 day, 13:45:00 │ 314 0 1 99.7 │ 0.787 USDT 0.02% │
# > - 0.03 ==>Avg. stake amount 259.702 USDT │ Total trade volume 149302.88 USDT 285 │ 1.19 │ 974.542 │ 48.73│ 1 day, 17:45:00 │ 284 0 1 99.6 │ 0.787 USDT 0.03% │
# > - 0.03 ==>Avg. stake amount 253.535 USDT │ Total trade volume 145312.936 USDT 284 │ 1.19 │ 1014.898 │ 50.74| 1 day, 17:54:00 │ 283 0 1 99.6 │ 0.684 USDT 0.02% │
# > - 0.015 ==>Avg. stake amount 249.107 USDT │ Total trade volume 138186.861 USDT 275 │ 1.20 │ 901.976 │ 45.1 │ 1 day, 19:17:00 │ 274 0 1 99.6 │ 0.684 USDT 0.02%
condition = True #last_candle['mid_smooth_1h_deriv1'] > - 0.05 #(last_candle['mid_smooth_3_deriv1'] > self.buy_mid_smooth_3_deriv1.value) and (last_candle['mid_smooth_24_deriv1'] > self.buy_mid_smooth_24_deriv1.value)
# (last_candle['enter_long'] == 1 & (count_of_buys < 3)) \
# or ((before_last_candle['mid_re_smooth_3_deriv1'] <= 0) & (last_candle['mid_re_smooth_3_deriv1'] >= 0) & (3 <= count_of_buys < 6)) \
# or ((before_last_candle['mid_smooth_1h_deriv1'] <= 0) & (last_candle['mid_smooth_1h_deriv1'] >= 0) & (6 <= count_of_buys))
limit_buy = 20
if (count_of_buys < limit_buy) and condition and (pct_max < lim): # and val > self.buy_val_adjust.value and last_candle['mid_smooth_deriv1_1d'] > - 1):
try:
# print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_first={pct_first:.3f} pct_max={pct_max:.3f} lim={lim:.3f} index={index}")
# self.pairs[trade.pair]['last_palier_index'] = index
# Appel de la fonction
poly_func, x_future, y_future, count = self.polynomial_forecast(
dataframe['sma24_deriv1_1h'],
window=self.buy_horizon_predict_1h.value * 12,
degree=self.buy_level_predict_1h.value,
n_future=3)
if count < 3:
return None
max_amount = self.config.get('stake_amount', 100) * 2.5
# stake_amount = min(stake_amount, self.wallets.get_available_stake_amount())
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / pct) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys])
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
except Exception as exception:
print(exception)
return None
# if (count_of_buys < limit_buy and pct_max > pct and current_profit > 0.004) \
# and (last_candle['rsi_deriv1_1h'] >= -5) \
# and (last_candle['tendency'] in ('P', 'H++', 'DH', 'H+')) \
# and (last_candle['mid_smooth_deriv1'] > 0.015):
# try:
# trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="Gain +",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type=trade_type,
# profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
# buys=trade.nr_of_successful_entries + 1,
# stake=round(stake_amount, 2)
# )
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
# return stake_amount
# except Exception as exception:
# print(exception)
# return None
return None
# def getProbaHausse144(self, last_candle):
# value_1 = self.getValuesFromTable(self.mid_smooth_24_deriv1_bins, last_candle['mid_smooth_24_deriv1'])
# value_2 = self.getValuesFromTable(self.sma144_deriv1_bins, last_candle['sma144_deriv1'])
#
# val = self.approx_val_from_bins(
# matrice=self.smooth24_sma144_deriv1_matrice_df,
# numeric_matrice=self.smooth24_sma144_deriv1_numeric_matrice,
# row_label=value_2,
# col_label=value_1)
# return val
#
# def getProbaHausse1h(self, last_candle):
# value_1 = self.getValuesFromTable(self.mid_smooth_1h_bins, last_candle['mid_smooth_1h_deriv1'])
# value_2 = self.getValuesFromTable(self.sma24_deriv1_1h_bins, last_candle['sma24_deriv1_1h'])
#
# val = self.approx_val_from_bins(matrice=self.smooth_smadiff_matrice_df, numeric_matrice=self.smooth_smadiff_numeric_matrice,
# row_label=value_2,
# col_label=value_1)
# return val
#
# def getProbaHausse1d(self, last_candle):
# value_1 = self.getValuesFromTable(self.mid_smooth_1h_bins, last_candle['mid_smooth_deriv1_1d'])
# value_2 = self.getValuesFromTable(self.sma24_deriv1_1h_bins, last_candle['sma5_deriv1_1d'])
#
# val = self.approx_val_from_bins(matrice=self.smooth_smadiff_matrice_df, numeric_matrice=self.smooth_smadiff_numeric_matrice, row_label=value_2,
# col_label=value_1)
# return val
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# Calculer le minimum des 14 derniers jours
base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré
first_price = self.pairs[pair]['first_buy']
if (first_price == 0):
first_price = last_candle['close']
last_max = last_candle['max12_1d']
pct = 5
if last_max > 0:
pct = 100 * (last_max - first_price) / last_max
thresholds = [2, 5, 10, 20]
factors = [1, 1.25, 1.5, 2.0]
factor = self.multi_step_interpolate(pct, thresholds, factors)
adjusted_stake_amount = base_stake_amount * factor #max(base_stake_amount, min(100, base_stake_amount * percent_4))
return adjusted_stake_amount
def expectedProfit(self, pair: str, last_candle: DataFrame):
expected_profit = 0.004 #min(0.01, first_max)
# print(
# f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}")
return expected_profit
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values
# ✅ Première dérivée(variation ou pente)
# Positive: la courbe est croissante → tendance haussière.
# Négative: la courbe est décroissante → tendance baissière.
# Proche de 0: la courbe est plate → marché stable ou en transition.
#
# Applications:
# Détecter les points dinflexion(changement de tendance) quand elle sannule.\
# Analyser la vitesse dun mouvement(plus elle est forte, plus le mouvement est impulsif).
#
# ✅ Seconde dérivée(accélération ou concavité)
# Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse.
# Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse.
# Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements.
#
# Exemples:
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
#
# Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0.
# Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe.
def calculateRegression(self,
dataframe: DataFrame,
column= 'close',
window= 50,
degree=3,
future_offset: int = 10 # projection à n bougies après
) -> DataFrame:
df = dataframe.copy()
regression_fit = []
regression_future_fit = []
regression_fit = []
regression_future_fit = []
for i in range(len(df)):
if i < window:
regression_fit.append(np.nan)
regression_future_fit.append(np.nan)
continue
# Fin de la fenêtre dapprentissage
end_index = i
start_index = i - window
y = df[column].iloc[start_index:end_index].values
# Si les données sont insuffisantes (juste par précaution)
if len(y) < window:
regression_fit.append(np.nan)
regression_future_fit.append(np.nan)
continue
# x centré pour meilleure stabilité numérique
x = np.linspace(-1, 1, window)
coeffs = np.polyfit(x, y, degree)
poly = np.poly1d(coeffs)
# Calcul point présent (dernier de la fenêtre)
x_now = x[-1]
regression_fit.append(poly(x_now))
# Calcul point futur, en ajustant si on dépasse la fin
remaining = len(df) - i - 1
effective_offset = min(future_offset, remaining)
x_future = x_now + (effective_offset / window) * 2 # respect du même pas
regression_future_fit.append(poly(x_future))
df[f"{column}_regression"] = regression_fit
# 2. Dérivée première = différence entre deux bougies successives
df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"], 4)
# 3. Dérivée seconde = différence de la dérivée première
df[f"{column}_regression_deriv2"] = round(10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4)
df[f"{column}_future_{future_offset}"] = regression_future_fit
# # 2. Dérivée première = différence entre deux bougies successives
# df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4)
#
# # 3. Dérivée seconde = différence de la dérivée première
# df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4)
return df
def getValuesFromTable(self, values, value):
for i in range(len(values) - 1):
if values[i] <= value < values[i + 1]:
return self.labels[i]
return self.labels[-1] # cas limite pour la borne max
def interpolated_val_from_bins(self, row_pos, col_pos):
"""
Renvoie une approximation interpolée (bilinéaire) d'une valeur dans la matrice
à partir de positions flottantes dans l'index (ligne) et les colonnes.
Parameters:
matrix_df (pd.DataFrame): Matrice des probabilités (index/colonnes = labels).
row_pos (float): Position réelle de la ligne (0 = B5, 10 = H5).
col_pos (float): Position réelle de la colonne (0 = B5, 10 = H5).
Returns:
float: Valeur interpolée, ou NaN si en dehors des bornes.
"""
# Labels ordonnés
n = len(self.labels)
# Vérification des limites
if not (0 <= row_pos <= n - 1) or not (0 <= col_pos <= n - 1):
return np.nan
# Conversion des labels -> matrice
matrix = self.smooth_smadiff_matrice_df.reindex(index=self.labels, columns=self.labels).values
# Coordonnées entières (inférieures)
i = int(np.floor(row_pos))
j = int(np.floor(col_pos))
# Coefficients pour interpolation
dx = row_pos - i
dy = col_pos - j
# Précautions sur les bords
if i >= n - 1: i = n - 2; dx = 1.0
if j >= n - 1: j = n - 2; dy = 1.0
# Récupération des 4 valeurs voisines
v00 = matrix[i][j]
v10 = matrix[i + 1][j]
v01 = matrix[i][j + 1]
v11 = matrix[i + 1][j + 1]
# Interpolation bilinéaire
interpolated = (
(1 - dx) * (1 - dy) * v00 +
dx * (1 - dy) * v10 +
(1 - dx) * dy * v01 +
dx * dy * v11
)
return interpolated
def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label):
"""
Renvoie une approximation de la valeur à partir des labels binaires (e.g. B5, H1)
en utilisant une interpolation simple basée sur les indices.
Parameters:
matrix_df (pd.DataFrame): Matrice avec les labels binaires en index et colonnes.
row_label (str): Label de la ligne (ex: 'B3').
col_label (str): Label de la colonne (ex: 'H2').
Returns:
float: Valeur approchée si possible, sinon NaN.
"""
# Vérification des labels
if row_label not in matrice.index or col_label not in matrice.columns:
return np.nan
# Index correspondant
row_idx = self.label_to_index.get(row_label)
col_idx = self.label_to_index.get(col_label)
# Approximation directe (aucune interpolation complexe ici, juste une lecture)
return numeric_matrice[row_idx, col_idx]
# @property
# def protections(self):
# return [
# {
# "method": "CooldownPeriod",
# "stop_duration_candles": 12
# }
# # {
# # "method": "MaxDrawdown",
# # "lookback_period_candles": self.lookback.value,
# # "trade_limit": self.trade_limit.value,
# # "stop_duration_candles": self.protection_stop.value,
# # "max_allowed_drawdown": self.protection_max_allowed_dd.value,
# # "only_per_pair": False
# # },
# # {
# # "method": "StoplossGuard",
# # "lookback_period_candles": 24,
# # "trade_limit": 4,
# # "stop_duration_candles": self.protection_stoploss_stop.value,
# # "only_per_pair": False
# # },
# # {
# # "method": "StoplossGuard",
# # "lookback_period_candles": 24,
# # "trade_limit": 4,
# # "stop_duration_candles": 2,
# # "only_per_pair": False
# # },
# # {
# # "method": "LowProfitPairs",
# # "lookback_period_candles": 6,
# # "trade_limit": 2,
# # "stop_duration_candles": 60,
# # "required_profit": 0.02
# # },
# # {
# # "method": "LowProfitPairs",
# # "lookback_period_candles": 24,
# # "trade_limit": 4,
# # "stop_duration_candles": 2,
# # "required_profit": 0.01
# # }
# ]
def conditional_smoothing(self, series, threshold=0.002):
smoothed = [series.iloc[0]]
for val in series.iloc[1:]:
last = smoothed[-1]
if abs(val - last) / last >= threshold:
smoothed.append(val)
else:
smoothed.append(last)
return pd.Series(smoothed, index=series.index)
def smooth_and_derivatives(self, series, window=25, polyorder=3):
series = series.copy()
if series.isna().sum() > 0:
series = series.fillna(method='ffill').fillna(method='bfill') # Si tu veux éviter toute NaN
smooth = self.causal_savgol(series, window=window, polyorder=polyorder)
deriv1 = np.diff(smooth, prepend=smooth[0])
deriv2 = np.diff(deriv1, prepend=deriv1[0])
return pd.Series(smooth, index=series.index), pd.Series(deriv1, index=series.index), pd.Series(deriv2, index=series.index)
def causal_savgol(self, series, window=25, polyorder=3):
result = []
half_window = window # Fenêtre complète dans le passé
for i in range(len(series)):
if i < half_window:
result.append(np.nan)
continue
window_series = series[i - half_window:i]
if window_series.isna().any():
result.append(np.nan)
continue
coeffs = np.polyfit(range(window), window_series, polyorder)
poly = np.poly1d(coeffs)
result.append(poly(window - 1))
return pd.Series(result, index=series.index)
def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15,
max_stake: float = 1000.0) -> float:
"""
Calcule la mise à allouer en fonction du drawdown.
:param pct: Drawdown en pourcentage (ex: -0.12 pour -12%)
:param base_stake: Mise de base (niveau 0)
:param step: Espacement entre paliers (ex: tous les -4%)
:param growth: Facteur de croissance par palier (ex: 1.15 pour +15%)
:param max_stake: Mise maximale à ne pas dépasser
:return: Montant à miser
"""
if pct >= 0:
return base_stake
level = int(abs(pct) / step)
stake = base_stake * (growth ** level)
return min(stake, max_stake)
def compute_adaptive_paliers(self, max_drawdown: float = 0.65, first_steps: list[float] = [0.01, 0.01, 0.015, 0.02], growth: float = 1.2) -> list[float]:
"""
Génère une liste de drawdowns négatifs avec des paliers plus rapprochés au début.
:param max_drawdown: Drawdown max (ex: 0.65 pour -65%)
:param first_steps: Liste des premiers paliers fixes en % (ex: [0.01, 0.01, 0.015])
:param growth: Facteur multiplicatif pour espacer les paliers suivants
:return: Liste de drawdowns négatifs (croissants)
"""
paliers = []
cumulated = 0.0
# Étapes initiales rapprochées
for step in first_steps:
cumulated += step
paliers.append(round(-cumulated, 4))
# Étapes suivantes plus espacées
step = first_steps[-1]
while cumulated < max_drawdown:
step *= growth
cumulated += step
if cumulated >= max_drawdown:
break
paliers.append(round(-cumulated, 4))
return paliers
def get_dca_stakes(self,
max_drawdown: float = 0.65,
base_stake: float = 100.0,
first_steps: list[float] = [0.01, 0.01, 0.015, 0.015],
growth: float = 1.2,
stake_growth: float = 1.15
) -> list[tuple[float, float]]:
"""
Génère les paliers de drawdown et leurs stakes associés.
:param max_drawdown: Maximum drawdown (ex: 0.65 pour -65%)
:param base_stake: Mise initiale
:param first_steps: Paliers de départ (plus resserrés)
:param growth: Multiplicateur d'espacement des paliers
:param stake_growth: Croissance multiplicative des mises
:return: Liste de tuples (palier_pct, stake)
[(-0.01, 100.0), (-0.02, 115.0), (-0.035, 132.25), (-0.05, 152.09), (-0.068, 174.9),
(-0.0896, 201.14), (-0.1155, 231.31), (-0.1466, 266.0), (-0.1839, 305.9), (-0.2287, 351.79),
(-0.2825, 404.56), (-0.347, 465.24), (-0.4244, 535.03), (-0.5173, 615.28), (-0.6287, 707.57)]
"""
paliers = [
(-0.01, 100.0), (-0.02, 115.0), (-0.035, 130), (-0.05, 150), (-0.07, 150),
(-0.10, 150), (-0.15, 150), (-0.20, 150), (-0.25, 150),
(-0.30, 200), (-0.40, 200),
(-0.50, 300), (-0.60, 400), (-0.70, 500), (-0.80, 1000)
]
# cumulated = 0.0
# stake = base_stake
#
# # Étapes initiales
# for step in first_steps:
# cumulated += step
# paliers.append((round(-cumulated, 4), round(stake, 2)))
# stake *= stake_growth
#
# # Étapes suivantes
# step = first_steps[-1]
# while cumulated < max_drawdown:
# step *= growth
# cumulated += step
# if cumulated >= max_drawdown:
# break
# paliers.append((round(-cumulated, 4), round(stake, 2)))
# stake *= stake_growth
return paliers
def get_active_stake(self, pct: float) -> float:
"""
Renvoie la mise correspondant au drawdown `pct`.
:param pct: drawdown courant (négatif, ex: -0.043)
:param paliers: liste de tuples (drawdown, stake)
:return: stake correspondant
"""
abs_pct = abs(pct)
stake = self.paliers[0][1] # stake par défaut
for palier, s in self.paliers:
if abs_pct >= abs(palier):
stake = s
else:
break
return stake
def get_palier_index(self, pct):
"""
Retourne l'index du palier franchi pour un pourcentage de baisse donné (pct).
On cherche le palier le plus profond atteint (dernier franchi).
"""
for i in reversed(range(len(self.paliers))):
seuil, _ = self.paliers[i]
#print(f"pct={pct} seuil={seuil}")
if pct <= seuil:
# print(pct)
return i
return None # Aucun palier atteint
# def poly_regression_predictions(self, series: pd.Series, window: int = 20, degree: int = 2, n_future: int = 3) -> pd.DataFrame:
# """
# Renvoie une DataFrame avec `n_future` colonnes contenant les extrapolations des n prochains points
# selon une régression polynomiale ajustée sur les `window` dernières valeurs.
# """
# result = pd.DataFrame(index=series.index)
# x = np.arange(window)
#
# for future_step in range(1, n_future + 1):
# result[f'poly_pred_t+{future_step}'] = np.nan
#
# for i in range(window - 1, len(series)):
# y = series.iloc[i - window + 1 : i + 1].values
#
# if np.any(pd.isna(y)):
# continue
#
# coeffs = np.polyfit(x, y, degree)
# poly = np.poly1d(coeffs)
#
# for future_step in range(1, n_future + 1):
# future_x = window - 1 + future_step # Extrapolation point
# result.loc[series.index[i], f'poly_pred_t+{future_step}'] = poly(future_x)
#
# return result
def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, n_future: int = 3):
"""
Calcule une régression polynomiale sur les `window` dernières valeurs de la série,
puis prédit les `n_future` prochaines valeurs.
:param series: Série pandas (ex: dataframe['close'])
:param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme
:param degree: Degré du polynôme (ex: 2 pour quadratique)
:param n_future: Nombre de valeurs futures à prédire
:return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures
"""
if len(series) < window:
raise ValueError("La série est trop courte pour la fenêtre spécifiée.")
recent_y = series.iloc[-window:].values
x = np.arange(window)
coeffs = np.polyfit(x, recent_y, degree)
poly = np.poly1d(coeffs)
x_future = np.arange(window, window + n_future)
y_future = poly(x_future)
# Affichage de la fonction
# print("Fonction polynomiale trouvée :")
# print(poly_func)
count = 0
for future_step in [12, 24, 36]: #range(1, n_future + 1)
future_x = window - 1 + future_step
prediction = poly(future_x)
# result.loc[series.index[i], f'poly_pred_t+{future_step}'] = prediction
# Afficher les prédictions
# print(f" → t+{future_step}: x={future_x}, y={prediction:.2f}")
if prediction > 0:
count += 1
return poly, x_future, y_future, count