# Zeus Strategy: First Generation of GodStra Strategy with maximum # AVG/MID profit in USDT # Author: @Mablue (Masoud Azizi) # github: https://github.com/mablue/ # IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta) # freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus # --- Do not remove these libs --- from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np import os import json from pandas import DataFrame from typing import Optional, Union, Tuple import math import logging import configparser from technical import pivots_points from pathlib import Path # -------------------------------- # Add your lib to import here test git import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib import requests from datetime import timezone, timedelta from scipy.signal import savgol_filter from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator from collections import Counter logger = logging.getLogger(__name__) # Machine Learning from sklearn.ensemble import RandomForestClassifier,RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error from sklearn.metrics import accuracy_score import joblib import matplotlib.pyplot as plt from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve, precision_score, recall_score, precision_recall_curve, f1_score ) from sklearn.tree import export_text import inspect from sklearn.feature_selection import mutual_info_classif from sklearn.inspection import permutation_importance from lightgbm import LGBMClassifier from sklearn.calibration import CalibratedClassifierCV from sklearn.feature_selection import SelectFromModel from tabulate import tabulate from sklearn.model_selection import GridSearchCV from sklearn.feature_selection import VarianceThreshold import seaborn as sns import optuna import shap from optuna.visualization import plot_optimization_history from optuna.visualization import plot_parallel_coordinate from optuna.visualization import plot_param_importances from optuna.visualization import plot_slice from pandas import DataFrame from sklearn.calibration import CalibratedClassifierCV from sklearn.feature_selection import SelectFromModel from sklearn.feature_selection import VarianceThreshold from sklearn.inspection import PartialDependenceDisplay from sklearn.inspection import permutation_importance from sklearn.linear_model import LogisticRegression from sklearn.metrics import brier_score_loss, roc_auc_score from sklearn.metrics import ( classification_report, confusion_matrix, accuracy_score, roc_curve, precision_score, recall_score ) from sklearn.metrics import f1_score from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.tree import export_text from xgboost import XGBClassifier # Couleurs ANSI de base RED = "\033[31m" GREEN = "\033[32m" YELLOW = "\033[33m" BLUE = "\033[34m" MAGENTA = "\033[35m" CYAN = "\033[36m" RESET = "\033[0m" def pprint_df(dframe): print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False)) def normalize(df): df = (df - df.min()) / (df.max() - df.min()) return df def generate_state_params(states, mises): params = {} for s in states: prefix = "bm" if s < 0 else "bp" name = f"{prefix}{abs(s)}" params[name] = CategoricalParameter(mises, default=200, space='buy', optimize=True, load=True) return params class Zeus_8_3_2_B_4_2(IStrategy): # Machine Learning model_indicators = [ ] levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20] startup_candle_count = 12 * 24 * 5 # ROI table: minimal_roi = { "1440": 0 } stakes = 40 # Stoploss: stoploss = -1 # 0.256 # Custom stoploss use_custom_stoploss = True trailing_stop = True trailing_stop_positive = 0.15 trailing_stop_positive_offset = 0.20 trailing_only_offset_is_reached = True # Buy hypers timeframe = '5m' max_open_trades = 5 max_amount = 40 parameters = {} # DCA config position_adjustment_enable = True plot_config = { "main_plot": { "sma24_1h": { "color": "pink" }, "sma5_1d": { "color": "blue" }, # "sma24": { # "color": "yellow" # }, "sma60": { "color": "green" }, "bb_lowerband": { "color": "#da59a6"}, "bb_upperband": { "color": "#da59a6", }, # "sma12": { # "color": "blue" # }, "mid_smooth_3_1h": { "color": "blue" } }, "subplots": { "Rsi": { "max_rsi_24": { "color": "blue" }, "max_rsi_24_1h": { "color": "pink" }, # "rsi_1h": { # "color": "red" # }, # "rsi_1d": { # "color": "blue" # } }, "Rsi_deriv1": { "sma24_deriv1_1h": { "color": "pink" }, "sma24_deriv1": { "color": "yellow" }, "sma5_deriv1_1d": { "color": "blue" }, "sma60_deriv1": { "color": "green" } }, "Rsi_deriv2": { "sma24_deriv2_1h": { "color": "pink" }, "sma24_deriv2": { "color": "yellow" }, "sma5_deriv2_1d": { "color": "blue" }, "sma60_deriv2": { "color": "green" } }, "States": { "tdc_macd_1h": { "color": "cyan" }, "sma24_state_1h": { "color": "pink" }, "sma24_state": { "color": "yellow" }, "sma5_state_1d": { "color": "blue" }, "sma60_state": { "color": "green" } }, 'Macd': { "macd_rel_1d": { "color": "cyan" }, "macdsignal_rel_1d": { "color": "pink" }, "macdhist_rel_1d": { "color": "yellow" } } } } columns_logged = False pairs = { pair: { 'first_amount': 0, "first_buy": 0, "last_buy": 0.0, "last_min": 999999999999999.5, "last_max": 0, "trade_info": {}, "max_touch": 0.0, "last_sell": 0.0, 'count_of_buys': 0, 'current_profit': 0, 'expected_profit': 0, 'previous_profit': 0, "last_candle": {}, "last_count_of_buys": 0, 'base_stake_amount': 0, 'stop_buy': False, 'last_date': 0, 'stop': False, 'max_profit': 0, 'total_amount': 0, 'has_gain': 0, 'force_sell': False, 'force_buy': False } for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC", "BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"] } # 20 20 40 60 100 160 260 420 # 50 50 100 300 500 # fibo = [1, 1, 2, 3, 5, 8, 13, 21] # my fibo # 50 50 50 100 100 150 200 250 350 450 600 1050 fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21] baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84] # Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21 # Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050 # Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200 # baisse 1 2 3 5 7 10 14 19 26 35 47 63 84 # factors = [1, 1.1, 1.25, 1.5, 2.0, 3] # thresholds = [2, 5, 10, 20, 30, 50] factors = [0.5, 0.75, 1, 1.25, 1.5, 2] thresholds = [0, 2, 5, 10, 30, 45] trades = list() max_profit_pairs = {} mise_factor_buy = DecimalParameter(0.01, 0.1, default=0.05, decimals=2, space='buy', optimize=False, load=True) indicators = {'sma5', 'sma12', 'sma24', 'sma60'} indicators_percent = {'percent', 'percent5', 'percent12', 'percent24', 'percent_1h', 'percent5_1h', 'percent12_1h', 'percent24_1h'} pct = DecimalParameter(0.005, 0.05, default=0.012, decimals=3, space='buy', optimize=False, load=True) pct_inc = DecimalParameter(0.0001, 0.003, default=0.0022, decimals=4, space='buy', optimize=False, load=True) mises = [0,200,400,600,800,1000] states = [-4, -3, -2, -1, 0, 1, 2, 3, 4] locals().update(generate_state_params(states, mises)) labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] index_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] ordered_labels = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] label_to_index = {label: i for i, label in enumerate(ordered_labels)} # ========================================================================= # paliers dérivées jour sma5 sma5_deriv1 = [-1.1726, -0.2131, -0.1012, -0.0330, 0.0169, 0.0815, 0.2000, 4.0335] sma5_deriv2 = [-1.9190, -0.1388, -0.0644, -0.0202, 0.0209, 0.0646, 0.1377, 4.2987] sma5_derive1_2_matrice = { 'B3': [8.6, 10.8, 34.6, 35.0, 58.8, 61.9, 91.2], 'B2': [0.0, 12.5, 9.1, 57.1, 63.3, 79.3, 89.5], 'B1': [6.1, 12.5, 22.0, 46.8, 61.5, 70.0, 100.0], 'N0': [0.0, 10.7, 37.0, 43.5, 75.0, 75.9, 100.0], 'H1': [0.0, 18.5, 32.4, 35.9, 76.8, 82.9, 92.0], 'H2': [0.0, 21.9, 16.0, 39.5, 69.7, 83.3, 100.0], 'H3': [9.5, 29.2, 41.2, 57.9, 53.8, 86.8, 92.3], } sma5_derive1_2_matrice_df = pd.DataFrame(sma5_derive1_2_matrice, index=index_labels) # Extraction de la matrice numérique sma5_derive1_2_numeric_matrice = sma5_derive1_2_matrice_df.reindex(index=ordered_labels, columns=ordered_labels).values should_enter_trade_count = 0 def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str, current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool: minutes = 0 if self.pairs[pair]['last_date'] != 0: minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60)) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_2 = dataframe.iloc[-2].squeeze() last_candle_3 = dataframe.iloc[-3].squeeze() condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m) # allow_to_buy = True #(not self.stop_all) #& (not self.all_down) # and val > self.buy_val.value #not last_candle['tendency'] in (-1, -2) # (rate <= float(limit)) | (entry_tag == 'force_entry') self.should_enter_trade(pair, last_candle, current_time) allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry') # if allow_to_buy: # poly_func, x_future, y_future, count = self.polynomial_forecast( # dataframe['mid_smooth_12'], # window=self.buy_horizon_predict_1h.value * 12, # degree=4, # n_future=3) # # if count < 3: # allow_to_buy = False force = self.pairs[pair]['force_buy'] if self.pairs[pair]['force_buy']: self.pairs[pair]['force_buy'] = False allow_to_buy = True else: if not self.should_enter_trade(pair, last_candle, current_time): allow_to_buy = False # # On récupère le dernier trade ouvert (toutes paires) # last_date = self.pairs[pair]['last_date'] # # if not last_date: # last_date = dataframe.iloc[-1]["date"] # self.pairs[pair]['last_date'] = last_date # # now = dataframe.iloc[-1]["date"] # # if now - last_date >= timedelta(hours=24): # allow_to_buy = True if allow_to_buy: self.trades = list() self.pairs[pair]['first_buy'] = rate self.pairs[pair]['last_buy'] = rate self.pairs[pair]['max_touch'] = last_candle['close'] self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['count_of_buys'] = 1 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['last_date'] = current_time dispo = round(self.wallets.get_available_stake_amount()) self.printLineLog() stake_amount = self.adjust_stake_amount(pair, last_candle) self.pairs[pair]['total_amount'] = stake_amount self.log_trade( last_candle=last_candle, date=current_time, action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes), pair=pair, rate=rate, dispo=dispo, profit=0, trade_type=entry_tag, buys=1, stake=round(stake_amount, 2) ) return allow_to_buy def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float, time_in_force: str, exit_reason: str, current_time, **kwargs, ) -> bool: # allow_to_sell = (minutes > 30) dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() profit =trade.calc_profit(rate) force = self.pairs[pair]['force_sell'] allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'sma48') or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)) if allow_to_sell: self.trades = list() self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys'] self.pairs[pair]['last_sell'] = rate self.pairs[pair]['last_candle'] = last_candle self.pairs[pair]['max_profit'] = 0 self.pairs[pair]['previous_profit'] = 0 self.trades = list() dispo = round(self.wallets.get_available_stake_amount()) # print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}") self.log_trade( last_candle=last_candle, date=current_time, action="🟥Sell " + str(minutes), pair=pair, trade_type=exit_reason, rate=last_candle['close'], dispo=dispo, profit=round(profit, 2) ) self.pairs[pair]['force_sell'] = False self.pairs[pair]['has_gain'] = 0 self.pairs[pair]['current_profit'] = 0 self.pairs[pair]['total_amount'] = 0 self.pairs[pair]['count_of_buys'] = 0 self.pairs[pair]['max_touch'] = 0 self.pairs[pair]['last_buy'] = 0 self.pairs[pair]['last_date'] = current_time self.pairs[pair]['current_trade'] = None # else: # print(f"STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning") return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float, proposed_stake: float, min_stake: float, max_stake: float, **kwargs) -> float: dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe) current_candle = dataframe.iloc[-1].squeeze() adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle) # print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}") # Use default stake amount. return adjusted_stake_amount def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs): dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() last_candle_1h = dataframe.iloc[-13].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_2 = dataframe.iloc[-3].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() expected_profit = self.expectedProfit(pair, last_candle) # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}") max_touch_before = self.pairs[pair]['max_touch'] self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max']) self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min']) self.pairs[pair]['current_trade'] = trade count_of_buys = trade.nr_of_successful_entries profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit) max_profit = self.pairs[pair]['max_profit'] baisse = 0 if profit > 0: baisse = 1 - (profit / max_profit) mx = max_profit / 5 self.pairs[pair]['count_of_buys'] = count_of_buys self.pairs[pair]['current_profit'] = profit dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = round((current_time - trade.open_date_utc).seconds / 3600.0, 1) days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 if hours % 4 == 0: self.log_trade( last_candle=last_candle, date=current_time, action=("🔴 NOW" if self.pairs[pair]['stop'] else "🟢 NOW ") + str(hours_since_first_buy), dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=round(profit, 2), buys=count_of_buys, stake=0 ) # if (last_candle['mid_smooth_deriv1'] >= 0): # return None # if (last_candle['tendency'] in (2, 1)) and (last_candle['rsi'] < 80): # return None # # if (last_candle['sma24_deriv1'] < 0 and before_last_candle['sma24_deriv1'] >= 0) and (current_profit > expected_profit): # return 'Drv_' + str(count_of_buys) pair_name = self.getShortName(pair) if current_profit < - 0.02 and last_candle['sma48'] < before_last_candle['sma48'] - 10 and last_candle['sma60_deriv2'] < - 10 and (last_candle['hapercent3'] < -0.0005) and (last_candle['percent'] < 0): self.pairs[pair]['stop'] = True self.pairs[pair]['force_sell'] = True self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'stop48_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # if current_profit < - 0.005 and last_candle['sma5_1h'] < before_last_candle_12['sma5_1h'] and hours > 12: # self.pairs[pair]['force_sell'] = True # self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + '5hinv_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # if last_candle['stop_buying']: # # self.pairs[pair]['stop'] = True # self.pairs[pair]['force_sell'] = True # self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'stopbuy_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # if hours_since_first_buy >= 23.9: # self.pairs[pair]['force_sell'] = True # self.pairs[pair]['force_buy'] = True #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'hours_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if last_candle['max_rsi_24'] > 70 and profit > max(5, expected_profit) and (last_candle['hapercent3'] < -0.0005) and (last_candle['percent'] < 0): # and last_candle['sma60_deriv1'] < 0.05: self.pairs[pair]['force_sell'] = True self.pairs[pair]['force_buy'] = False #(self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Rsi85_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if self.pairs[pair]['force_sell']: self.pairs[pair]['force_sell'] = False self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'Frc_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # if profit > max(5, expected_profit) and baisse > 0.30: # self.pairs[pair]['force_sell'] = True # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # # if (max_profit > 0.5 * count_of_buys or hours > 12) and baisse > 0.15 and last_candle['sma12_state'] <= 0 and last_candle['sma60_state'] <= - 1: # self.pairs[pair]['force_sell'] = True # self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) # return str(count_of_buys) + '_' + 'B15_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if profit > max(5, expected_profit) and last_candle['sma5_inv_bas_1h'] == - 1 and (last_candle['hapercent3'] < -0.0005) and (last_candle['percent'] < 0): self.pairs[pair]['force_sell'] = True self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'SMA5_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) if profit < 0 and last_candle['sma5_inv_hau_1h']: self.pairs[pair]['stop'] = True self.pairs[pair]['force_sell'] = True self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3) return str(count_of_buys) + '_' + 'SMA24_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) # if (last_candle['sma48_deriv1'] < -0.1 and last_candle['sma48_deriv2'] < -10): # self.pairs[pair]['force_sell'] = True # return str(count_of_buys) + '_' + 'B48D1_' + pair_name + '_' + str(self.pairs[pair]['has_gain']) self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch']) def getShortName(self, pair): return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '') def informative_pairs(self): # get access to all pairs available in whitelist. pairs = self.dp.current_whitelist() informative_pairs = [(pair, '1d') for pair in pairs] informative_pairs += [(pair, '1h') for pair in pairs] return informative_pairs from typing import List def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float: if pct <= thresholds[0]: return factors[0] if pct >= thresholds[-1]: return factors[-1] for i in range(1, len(thresholds)): if pct <= thresholds[i]: # interpolation linéaire entre thresholds[i-1] et thresholds[i] return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / ( thresholds[i] - thresholds[i - 1]) # Juste au cas où (devrait jamais arriver) return factors[-1] # def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30, # start_factor: float = 1.0, end_factor: float = 2.0) -> float: # if pct <= start_pct: # return start_factor # if pct >= end_pct: # return end_factor # # interpolation linéaire # return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct) def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None): # Afficher les colonnes une seule fois if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return if self.columns_logged % 10 == 0: self.printLog( f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} " f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |" f"{'rsi':>6}|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h" ) self.printLineLog() df = pd.DataFrame.from_dict(self.pairs, orient='index') colonnes_a_exclure = ['last_candle', 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] print(df_filtered) self.columns_logged += 1 date = str(date)[:16] if date else "-" limit = None # if buys is not None: # limit = round(last_rate * (1 - self.fibo[buys] / 100), 4) rsi = '' rsi_pct = '' # if last_candle is not None: # if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])): # rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h'])) # if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])): # rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str( # int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h'])) # first_rate = self.percent_threshold.value # last_rate = self.threshold.value # action = self.color_line(action, action) sma5_1d = '' sma5_1h = '' sma5 = str(sma5_1d) + ' ' + str(sma5_1h) last_lost = self.getLastLost(last_candle, pair) if buys is None: buys = '' max_touch = '' # round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1) pct_max = self.getPctFirstBuy(pair, last_candle) total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values())) dist_max = self.getDistMax(last_candle, pair) # if trade_type is not None: # if np.isnan(last_candle['rsi_1d']): # string = ' ' # else: # string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_deriv1_1d'])) # trade_type = trade_type \ # + " " + string \ # + " " + str(int(last_candle['rsi_1h'])) \ # + " " + str(int(last_candle['rsi_deriv1_1h'])) # val144 = self.getProbaHausse144(last_candle) # val1h = self.getProbaHausse1h(last_candle) val = self.getProbaHausseSma5d(last_candle) pct60 = round(100 * self.getPct60D(pair, last_candle), 2) color = GREEN if profit > 0 else RED color_sma24 = GREEN if last_candle['sma24_deriv1_1d'] > 0 else RED color_sma24_2 = GREEN if last_candle['sma24_deriv2_1d'] > 0 else RED color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1d'] > 0 else RED color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1d'] > 0 else RED color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round( self.pairs[pair]['last_max'], 3) last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round( self.pairs[pair]['last_min'], 3) profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2)) # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. self.printLog( f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} " f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} " f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}" # f"|{round(last_candle['mid_smooth_24_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_1h_deriv1'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv1_1d'],3) or '-' :>6}|" # f"{round(last_candle['mid_smooth_24_deriv2'],3) or '-' :>6}|{round(last_candle['mid_smooth_1h_deriv2'],3) or '-':>6}|{round(last_candle['mid_smooth_deriv2_1d'],3) or '-':>6}|" f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|" f"{dist_max:>7}" #|s201d f"|{color_sma24}{round(last_candle['sma24_deriv1_1d'], 2):>5}{RESET}" #|s5_1d|s5_2d f"|{color_sma5}{round(last_candle['mid_smooth_5_deriv1_1d'], 2):>5}{RESET}|{color_sma5_2}{round(last_candle['mid_smooth_5_deriv2_1d'], 1):>5}{RESET}" #|s51h|s52h f"|{color_sma5_1h}{round(last_candle['sma60_deriv1'], 2):>5}{RESET}|{color_sma5_2h}{round(last_candle['sma60_deriv2'], 1):>5}{RESET}" #|smt1h|smt2h f"|{color_smooth_1h}{round(last_candle['mid_smooth_1h_deriv1'], 2):>5}{RESET}|{color_smooth2_1h}{round(last_candle['mid_smooth_1h_deriv2'], 1):>5}{RESET}" #|tdc1d|tdc1h f"|{last_candle['mid_smooth_5_state_1d'] or '-':>3}|{last_candle['mid_smooth_24_state_1h'] or '-':>3}|{last_candle['mid_smooth_5_state_1h'] or '-':>3}|{last_candle['mid_smooth_5_state'] or '-':>3}" f"|{last_candle['trend_class_1d']:>5} {last_candle['trend_class_1h']:>5}" ) def getLastLost(self, last_candle, pair): last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3) return last_lost def getDistMax(self, last_candle, pair): mx = last_candle['max12_1d'] dist_max = round(100 * (mx - last_candle['close']) / mx, 0) return dist_max def printLineLog(self): # f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|" self.printLog( f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+" f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+" ) def printLog(self, str): if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'): return; if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'): logger.info(str) else: if not self.dp.runmode.value in ('hyperopt'): print(str) def add_tendency_column(self, dataframe: pd.DataFrame, name: str, suffixe: str = '', eps: float = 1e-3, d1_lim_inf: float = -0.01, d1_lim_sup: float = 0.01) -> pd.DataFrame: """ Ajoute une colonne 'tendency' basée sur les dérivées 1 et 2 lissées et normalisées. eps permet de définir un seuil proche de zéro. suffixe permet de gérer plusieurs indicateurs. """ def tag_by_derivatives(row): d1 = row[f"{name}{suffixe}_deriv1"] d2 = row[f"{name}{suffixe}_deriv2"] # On considère les petites valeurs comme zéro if abs(d1) < eps: return 0 # Palier / neutre if d1 > d1_lim_sup: return 2 if d2 > eps else 1 # Acceleration Hausse / Ralentissement Hausse if d1 < d1_lim_inf: return -2 if d2 < -eps else -1 # Acceleration Baisse / Ralentissement Baisse if abs(d1) < eps: return 'DH' if d2 > eps else 'DB' # Depart Hausse / Depart Baisse return 'Mid' print(f"{name}_tdc{suffixe}") dataframe[f"{name}_tdc{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1) return dataframe # def add_tendency_column(self, dataframe: pd.DataFrame, name, suffixe='') -> pd.DataFrame: # def tag_by_derivatives(row): # d1 = row[f"{name}{suffixe}_deriv1"] # d2 = row[f"{name}{suffixe}_deriv2"] # d1_lim_inf = -0.01 # d1_lim_sup = 0.01 # if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup: # return 0 # Palier # if d1 == 0.0: # return 'DH' if d2 > 0 else 'DB' # Depart Hausse / Départ Baisse # if d1 > d1_lim_sup: # return 2 if d2 > 0 else 1 # Acceleration Hausse / Ralentissement Hausse # if d1 < d1_lim_inf: # return -2 if d2 < 0 else -1 # Accéleration Baisse / Ralentissement Baisse # return 'Mid' # # dataframe[f"tendency{suffixe}"] = dataframe.apply(tag_by_derivatives, axis=1) # return dataframe def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # Add all ta features pair = metadata['pair'] short_pair = self.getShortName(pair) name= type(self).__name__ self.path = f"user_data/strategies/plots/{name}/{short_pair}/" # + ("valide/" if not self.dp.runmode.value in ('backtest') else '') dataframe = self.populateDataframe(dataframe, timeframe='5m') # dataframe = self.calculateRegression(dataframe, column='mid_smooth', window=24, degree=4, future_offset=12) # dataframe = self.calculateRegression(dataframe, column='mid_smooth_24', window=24, degree=4, future_offset=12) ################### INFORMATIVE 1h informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h") informative = self.populateDataframe(informative, timeframe='1h') # informative['target_value'] = informative['sma5'].shift(-6).rolling(5).max() - informative['sma5'] * 1.005 # if self.dp.runmode.value in ('backtest'): # self.trainModel(informative, metadata) # # # Préparer les features pour la prédiction # features = informative[self.model_indicators].fillna(0) # # # Prédiction : probabilité que le prix monte # probs = self.model.predict_proba(features)[:, 1] # # # Sauvegarder la probabilité pour l’analyse # informative['ml_prob'] = probs dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True) ################### INFORMATIVE 1d informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d") informative = self.populateDataframe(informative, timeframe='1d') dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True) dataframe['last_price'] = dataframe['close'] dataframe['first_price'] = dataframe['close'] # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 # dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01 # dataframe['limit'] = dataframe['close'] count_buys = 0 if self.dp: if self.dp.runmode.value in ('live', 'dry_run'): self.getOpenTrades() for trade in self.trades: if trade.pair != pair: continue filled_buys = trade.select_filled_orders('buy') count = 0 amount = 0 for buy in filled_buys: if count == 0: dataframe['first_price'] = buy.price self.pairs[pair]['first_buy'] = buy.price self.pairs[pair]['first_amount'] = buy.price * buy.filled # dataframe['close01'] = buy.price * 1.01 # Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01, # status=closed, date=2024-08-26 02:20:11) dataframe['last_price'] = buy.price self.pairs[pair]['last_buy'] = buy.price count = count + 1 amount += buy.price * buy.filled # dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2 count_buys = count # dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100) self.pairs[pair]['total_amount'] = amount # dataframe['mid_smooth_tag'] = qtpylib.crossed_below(dataframe['mid_smooth_24_deriv1'], dataframe['mid_smooth_deriv2_24']) # =============================== # lissage des valeurs horaires dataframe['mid_smooth_1h'] = dataframe['mid'].rolling(window=6).mean() dataframe["mid_smooth_1h_deriv1"] = 100 * dataframe["mid_smooth_1h"].diff().rolling(window=6).mean() / \ dataframe['mid_smooth_1h'] dataframe["mid_smooth_1h_deriv2"] = 100 * dataframe["mid_smooth_1h_deriv1"].diff().rolling(window=6).mean() dataframe['mid_smooth_5h'] = talib.EMA(dataframe, timeperiod=60) # dataframe['mid'].rolling(window=60).mean() dataframe["mid_smooth_5h_deriv1"] = 100 * dataframe["mid_smooth_5h"].diff().rolling(window=60).mean() / \ dataframe['mid_smooth_5h'] dataframe["mid_smooth_5h_deriv2"] = 100 * dataframe["mid_smooth_5h_deriv1"].diff().rolling(window=60).mean() # =============================== # Lissage des valeurs Journalières horizon_d = 12 * 5 * 24 dataframe['stop_buying_deb'] = (dataframe['sma48_deriv1'] < - 0.2) & (dataframe['sma5_1h'] < dataframe['sma5_1h'].shift(13)) dataframe['stop_buying_end'] = (dataframe['sma48_deriv1'] > - 0.2) & (dataframe['sma5_1h'] > dataframe['sma5_1h'].shift(13)) latched = np.zeros(len(dataframe), dtype=bool) for i in range(1, len(dataframe)): if dataframe['stop_buying_deb'].iloc[i]: latched[i] = True elif dataframe['stop_buying_end'].iloc[i]: latched[i] = False else: latched[i] = latched[i - 1] dataframe['stop_buying'] = latched self.model_indicators = self.listUsableColumns(dataframe) # if False and self.dp.runmode.value in ('backtest'): # self.trainModel(dataframe, metadata) # # short_pair = self.getShortName(pair) # # self.model = joblib.load(f"{self.path}/{short_pair}_rf_model.pkl") # # # Préparer les features pour la prédiction # features = dataframe[self.model_indicators].fillna(0) # # # Prédiction : probabilité que le prix monte # # # Affichage des colonnes intérressantes dans le model # features_pruned, kept_features = self.prune_features( # model=self.model, # dataframe=dataframe, # feature_columns=self.model_indicators, # importance_threshold=0.005 # enlever features < % importance # ) # # probs = self.model.predict_proba(features)[:, 1] # # # Sauvegarder la probabilité pour l’analyse # dataframe['ml_prob'] = probs # # if False and self.dp.runmode.value in ('backtest'): # self.inspect_model(self.model) return dataframe def prune_features(self, model, dataframe, feature_columns, importance_threshold=0.01): """ Supprime les features dont l'importance est inférieure au seuil. Args: model: XGBClassifier déjà entraîné dataframe: DataFrame contenant toutes les features feature_columns: liste des colonnes/features utilisées pour la prédiction importance_threshold: seuil minimal pour conserver une feature (en proportion de l'importance totale) Returns: dataframe_pruned: dataframe avec uniquement les features conservées kept_features: liste des features conservées """ booster = model.get_booster() # Récupérer importance des features selon 'gain' importance = booster.get_score(importance_type='gain') # Normaliser pour que la somme soit 1 total_gain = sum(importance.values()) normalized_importance = {k: v / total_gain for k, v in importance.items()} # Features à garder kept_features = [f for f in feature_columns if normalized_importance.get(f, 0) >= importance_threshold] dataframe_pruned = dataframe[kept_features].fillna(0) # print(f"⚡ Features conservées ({len(kept_features)} / {len(feature_columns)}): {kept_features}") return dataframe_pruned, kept_features def listUsableColumns(self, dataframe): # Étape 1 : sélectionner numériques numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # Étape 2 : enlever constantes usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 and ( ("deriv" in c or "dist" in c) and ("_1h" in c or "_1d" in c) ) # and not "smooth" in c and not c.endswith("_state") # and not c.endswith("_1d") # and not c.endswith("_1h") and not c.startswith("open") and not c.startswith("close") and not c.startswith("low") and not c.startswith("high") and not c.startswith("haopen") and not c.startswith("haclose") # and not c.startswith("bb_lower") and not c.startswith("bb_upper") # and not c.startswith("bb_middle") and not c.endswith("_count") and not c.endswith("_class") and not c.endswith("_price") and not c.startswith('stop_buying') and not c.startswith('target') and not c.startswith('lvl') # and not c.startswith('sma5_deriv1_1h') # and not c.startswith('sma5_1h') # and not c.startswith('sma12_deriv1_1h') # and not c.startswith('sma12_1h') # and not c.startswith('confidence_index') # and not c.startswith('price_change') # and not c.startswith('price_score') # and not c.startswith('heat_score') # and not c.startswith('min30_1d') # and not c.startswith('max30_1d') ] # Étape 3 : remplacer inf et NaN par 0 dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) print("Colonnes utilisables pour le modèle :") print(usable_cols) self.model_indicators = usable_cols return usable_cols def trainModel(self, dataframe: DataFrame, metadata: dict): pair = self.getShortName(metadata['pair']) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option("display.width", 200) path = self.path # f"user_data/plots/{pair}/" os.makedirs(path, exist_ok=True) # # Étape 1 : sélectionner numériques # numeric_cols = dataframe.select_dtypes(include=['int64', 'float64']).columns # # # Étape 2 : enlever constantes # usable_cols = [c for c in numeric_cols if dataframe[c].nunique() > 1 # and (not c.endswith("_state") and not c.endswith("_1h") and not c.endswith("_1d") # and not c.endswith("_class") and not c.endswith("_price") # and not c.startswith('stop_buying'))] # # # Étape 3 : remplacer inf et NaN par 0 # dataframe[usable_cols] = dataframe[usable_cols].replace([np.inf, -np.inf], 0).fillna(0) # # print("Colonnes utilisables pour le modèle :") # print(usable_cols) # # self.model_indicators = usable_cols # df = dataframe[self.model_indicators].copy() # Corrélations des colonnes corr = df.corr(numeric_only=True) print("Corrélation des colonnes") print(corr) # 3️⃣ Créer la cible : 1 si le prix monte dans les prochaines bougies # df['target'] = (df['sma24'].shift(-24) > df['sma24']).astype(int) dataframe['target'] = (dataframe['sma5_1h'].shift(-48) > dataframe['sma5_1h']).astype(int) df['target'] = dataframe['target'].fillna(0).astype(int) # Corrélations triées par importance avec une colonne cible target_corr = df.corr(numeric_only=True)["target"].sort_values(ascending=False) print("Corrélations triées par importance avec une colonne cible") print(target_corr) # Corrélations triées par importance avec une colonne cible corr = df.corr(numeric_only=True) corr_unstacked = ( corr.unstack() .reset_index() .rename(columns={"level_0": "col1", "level_1": "col2", 0: "corr"}) ) # Supprimer les doublons col1/col2 inversés et soi-même corr_unstacked = corr_unstacked[corr_unstacked["col1"] < corr_unstacked["col2"]] # Trier par valeur absolue de corrélation corr_sorted = corr_unstacked.reindex(corr_unstacked["corr"].abs().sort_values(ascending=False).index) print("Trier par valeur absolue de corrélation") print(corr_sorted.head(20)) # --- Calcul de la corrélation --- corr = df.corr(numeric_only=True) # évite les colonnes non numériques corr = corr * 100 # passage en pourcentage # --- Masque pour n’afficher que le triangle supérieur (optionnel) --- mask = np.triu(np.ones_like(corr, dtype=bool)) # --- Création de la figure --- fig, ax = plt.subplots(figsize=(96, 36)) # --- Heatmap avec un effet “température” --- sns.heatmap( corr, mask=mask, cmap="coolwarm", # palette bleu → rouge center=0, # 0 au centre annot=True, # affiche les valeurs dans chaque case fmt=".0f", # format entier (pas de décimale) cbar_kws={"label": "Corrélation (%)"}, # légende à droite linewidths=0.5, # petites lignes entre les cases ax=ax ) # --- Personnalisation --- ax.set_title("Matrice de corrélation (en %)", fontsize=20, pad=20) plt.xticks(rotation=45, ha="right") plt.yticks(rotation=0) # --- Sauvegarde --- output_path = f"{self.path}/Matrice_de_correlation_temperature.png" plt.savefig(output_path, bbox_inches="tight", dpi=150) plt.close(fig) print(f"✅ Matrice enregistrée : {output_path}") # Exemple d'utilisation : selected_corr = self.select_uncorrelated_features(df, target="target", top_n=30, corr_threshold=0.7) print("===== 🎯 FEATURES SÉLECTIONNÉES =====") print(selected_corr) # Nettoyage df = df.dropna() X = df[self.model_indicators] y = df['target'] # ta colonne cible binaire ou numérique print("===== 🎯 FEATURES SCORES =====") print(self.feature_auc_scores(X, y)) # 4️⃣ Split train/test X = df[self.model_indicators] y = df['target'] # Séparation temporelle (train = 80 %, valid = 20 %) X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False) # Nettoyage des valeurs invalides selector = VarianceThreshold(threshold=0.0001) selector.fit(X_train) selected = X_train.columns[selector.get_support()] print("Colonnes conservées :", list(selected)) # 5️⃣ Entraînement du modèle # self.train_model = RandomForestClassifier(n_estimators=200, random_state=42) # def objective(trial): # self.train_model = XGBClassifier( # n_estimators=trial.suggest_int("n_estimators", 200, 300), # max_depth=trial.suggest_int("max_depth", 3, 6), # learning_rate=trial.suggest_float("learning_rate", 0.01, 0.3), # subsample=trial.suggest_float("subsample", 0.7, 1.0), # colsample_bytree=trial.suggest_float("colsample_bytree", 0.7, 1.0), # scale_pos_weight=1, # tu mettras balance_ratio ici si tu veux # objective="binary:logistic", # eval_metric="logloss", # n_jobs=-1 # ) # # self.train_model.fit(X_train, y_train) # # y_pred = self.train_model.predict(X_valid) # <-- validation = test split # return f1_score(y_valid, y_pred) # # study = optuna.create_study(direction="maximize") # study.optimize(objective, n_trials=50) def objective(trial): # local_model = XGBClassifier( # n_estimators=300, # nombre d'arbres plus raisonnable # learning_rate=0.01, # un peu plus rapide que 0.006, mais stable # max_depth=4, # capture plus de patterns que 3, sans overfitting excessif # subsample=0.7, # utilise 70% des lignes pour chaque arbre → réduit overfitting # colsample_bytree=0.8, # 80% des features par arbre # gamma=0.01, # gain minimal pour un split → régularisation # reg_alpha=0.01, # L1 régularisation des feuilles # reg_lambda=1, # L2 régularisation des feuilles # n_jobs=-1, # utilise tous les cœurs CPU pour accélérer # random_state=42, # reproductibilité # missing=float('nan'), # valeur manquante reconnue # eval_metric='logloss' # métrique pour classification binaire # ) local_model = XGBClassifier( n_estimators=trial.suggest_int("n_estimators", 300, 500), max_depth=trial.suggest_int("max_depth", 1, 6), learning_rate=trial.suggest_float("learning_rate", 0.005, 0.3, log=True), subsample=trial.suggest_float("subsample", 0.6, 1.0), colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0), scale_pos_weight=1, objective="binary:logistic", eval_metric="logloss", n_jobs=-1 ) local_model.fit( X_train, y_train, eval_set=[(X_valid, y_valid)], # early_stopping_rounds=50, verbose=False ) proba = local_model.predict_proba(X_valid)[:, 1] thresholds = np.linspace(0.1, 0.9, 50) best_f1 = max(f1_score(y_valid, (proba > t)) for t in thresholds) return best_f1 study = optuna.create_study(direction="maximize") study.optimize(objective, n_trials=20) # SHAP # Reconstruction du modèle final avec les meilleurs hyperparamètres # Récupération des meilleurs paramètres trouvés best_params = study.best_params best_model = XGBClassifier(**best_params) best_model.fit(X_train, y_train) self.train_model = best_model # === SHAP plots === # Calcul SHAP explainer = shap.TreeExplainer(self.train_model) shap_values = explainer(X_train) # On choisit une observation pour le graphique waterfall # Explication du modèle de prédiction pour la première ligne de X_valid.” i = 0 # Extraction des valeurs shap_val = shap_values[i].values feature_names = X_train.columns feature_values = X_train.iloc[i] # Tri par importance absolue # order = np.argsort(np.abs(shap_val))[::-1] k = 10 order = np.argsort(np.abs(shap_val))[::-1][:k] # ---- Création figure sans l'afficher ---- plt.ioff() # Désactive l'affichage interactif shap.plots.waterfall( shap.Explanation( values=shap_val[order], base_values=shap_values.base_values[i], data=feature_values.values[order], feature_names=feature_names[order] ), show=False # IMPORTANT : n'affiche pas dans Jupyter / console ) # Sauvegarde du graphique sur disque output_path = f"{self.path}/shap_waterfall.png" plt.savefig(output_path, dpi=200, bbox_inches='tight') plt.close() # ferme la figure proprement print(f"Graphique SHAP enregistré : {output_path}") # FIN SHAP # ---- après avoir exécuté la study ------ print("Best value (F1):", study.best_value) print("Best params:", study.best_params) best_trial = study.best_trial print("\n=== BEST TRIAL ===") print("Number:", best_trial.number) print("Value:", best_trial.value) print("Params:") for k, v in best_trial.params.items(): print(f" - {k}: {v}") # All trials summary print("\n=== ALL TRIALS ===") for t in study.trials: print(f"Trial {t.number}: f1 = {t.value}, params = {t.params}") # DataFrame of trials df = study.trials_dataframe() print(df.head()) # Graphs fig = plot_optimization_history(study) fig.write_html(f"{self.path}/optimization_history.html") fig = plot_param_importances(study) fig.write_html(f"{self.path}/param_importances.html") fig = plot_slice(study) fig.write_html(f"{self.path}/slice.html") fig = plot_parallel_coordinate(study) fig.write_html(f"{self.path}/parallel_coordinates.html") # 2️⃣ Sélection des features AVANT calibration sfm = SelectFromModel(self.train_model, threshold="median", prefit=True) selected_features = X_train.columns[sfm.get_support()] print(selected_features) # 3️⃣ Calibration ensuite (facultative) calibrated = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5) calibrated.fit(X_train[selected_features], y_train) print(calibrated) # # # calibration # self.train_model = CalibratedClassifierCV(self.train_model, method='sigmoid', cv=5) # # Sélection # sfm = SelectFromModel(self.train_model, threshold="median") # sfm.fit(X_train, y_train) # selected_features = X_train.columns[sfm.get_support()] # print(selected_features) # self.train_model.fit(X_train, y_train) y_pred = self.train_model.predict(X_valid) y_proba = self.train_model.predict_proba(X_valid)[:, 1] # print(classification_report(y_valid, y_pred)) # print(confusion_matrix(y_valid, y_pred)) print("\nRapport de classification :\n", classification_report(y_valid, y_pred)) print("\nMatrice de confusion :\n", confusion_matrix(y_valid, y_pred)) # # Importances # importances = pd.DataFrame({ # "feature": self.train_model.feature_name_, # "importance": self.train_model.feature_importances_ # }).sort_values("importance", ascending=False) # print("\n===== 🔍 IMPORTANCE DES FEATURES =====") # print(importances) # Feature importance importances = self.train_model.feature_importances_ feat_imp = pd.Series(importances, index=X_train.columns).sort_values(ascending=False) # Affichage feat_imp.plot(kind='bar', figsize=(12, 6)) plt.title("Feature importances") # plt.show() plt.savefig(f"{self.path}/Feature importances.png", bbox_inches='tight') result = permutation_importance(self.train_model, X_valid, y_valid, scoring='f1', n_repeats=10, random_state=42) perm_imp = pd.Series(result.importances_mean, index=X_valid.columns).sort_values(ascending=False) perm_imp.plot(kind='bar', figsize=(12, 6)) plt.title("Permutation feature importance") # plt.show() plt.savefig(f"{self.path}/Permutation feature importance.png", bbox_inches='tight') # Shap explainer = shap.TreeExplainer(self.train_model) shap_values = explainer.shap_values(X_valid) # Résumé global shap.summary_plot(shap_values, X_valid) # Force plot pour une observation force_plot = shap.force_plot(explainer.expected_value, shap_values[0, :], X_valid.iloc[0, :]) shap.save_html(f"{self.path}/shap_force_plot.html", force_plot) fig, ax = plt.subplots(figsize=(24, 48)) PartialDependenceDisplay.from_estimator( self.train_model, X_valid, selected_features, kind="average", ax=ax ) fig.savefig(f"{self.path}/PartialDependenceDisplay.png", bbox_inches="tight") plt.close(fig) best_f1 = 0 best_t = 0.5 for t in [0.3, 0.4, 0.5, 0.6, 0.7]: y_pred_thresh = (y_proba > t).astype(int) score = f1_score(y_valid, y_pred_thresh) print(f"Seuil {t:.1f} → F1: {score:.3f}") if score > best_f1: best_f1 = score best_t = t print(f"✅ Meilleur seuil trouvé: {best_t} avec F1={best_f1:.3f}") # 6️⃣ Évaluer la précision (facultatif) preds = self.train_model.predict(X_valid) acc = accuracy_score(y_valid, preds) print(f"Accuracy: {acc:.3f}") # 7️⃣ Sauvegarde du modèle joblib.dump(self.train_model, f"{self.path}/{pair}_rf_model.pkl") print(f"✅ Modèle sauvegardé sous {pair}_rf_model.pkl") # X = dataframe des features (après shift/rolling/indicators) # y = target binaire ou décimale # model = ton modèle entraîné (RandomForestClassifier ou Regressor) # # --- 1️⃣ Mutual Information (MI) --- # mi_scores = mutual_info_classif(X.fillna(0), y) # mi_series = pd.Series(mi_scores, index=X.columns, name='MI') # # # --- 2️⃣ Permutation Importance (PI) --- # pi_result = permutation_importance(self.train_model, X, y, n_repeats=10, random_state=42, n_jobs=-1) # pi_series = pd.Series(pi_result.importances_mean, index=X.columns, name='PI') # # # --- 3️⃣ Combinaison dans un seul dataframe --- # importance_df = pd.concat([mi_series, pi_series], axis=1) # importance_df = importance_df.sort_values(by='PI', ascending=False) # tri par importance modèle # print(importance_df) # # importance_df.plot(kind='bar', figsize=(10, 5)) # plt.title("Mutual Info vs Permutation Importance") # plt.ylabel("Score") # plt.show() self.analyze_model(pair, self.train_model, X_train, X_valid, y_train, y_valid) def inspect_model(self, model): """ Affiche les informations d'un modèle ML déjà entraîné. Compatible avec scikit-learn, xgboost, lightgbm, catboost... """ print("===== 🔍 INFORMATIONS DU MODÈLE =====") # Type de modèle print(f"Type : {type(model).__name__}") print(f"Module : {model.__class__.__module__}") # Hyperparamètres if hasattr(model, "get_params"): params = model.get_params() print(f"\n===== ⚙️ HYPERPARAMÈTRES ({len(params)}) =====") for k, v in params.items(): print(f"{k}: {v}") # Nombre d’estimateurs if hasattr(model, "n_estimators"): print(f"\nNombre d’estimateurs : {model.n_estimators}") # Importance des features if hasattr(model, "feature_importances_"): print("\n===== 📊 IMPORTANCE DES FEATURES =====") # Correction ici : feature_names = getattr(model, "feature_names_in_", None) if isinstance(feature_names, np.ndarray): feature_names = feature_names.tolist() elif feature_names is None: feature_names = [f"feature_{i}" for i in range(len(model.feature_importances_))] fi = pd.DataFrame({ "feature": feature_names, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(fi) # Coefficients (modèles linéaires) if hasattr(model, "coef_"): print("\n===== ➗ COEFFICIENTS =====") coef = np.array(model.coef_) if coef.ndim == 1: for i, c in enumerate(coef): print(f"Feature {i}: {c:.6f}") else: print(coef) # Intercept if hasattr(model, "intercept_"): print("\nIntercept :", model.intercept_) # Classes connues if hasattr(model, "classes_"): print("\n===== 🎯 CLASSES =====") print(model.classes_) # Scores internes for attr in ["best_score_", "best_iteration_", "best_ntree_limit", "score_"]: if hasattr(model, attr): print(f"\n{attr} = {getattr(model, attr)}") # Méthodes disponibles print("\n===== 🧩 MÉTHODES DISPONIBLES =====") methods = [m for m, _ in inspect.getmembers(model, predicate=inspect.ismethod)] print(", ".join(methods[:15]) + ("..." if len(methods) > 15 else "")) print("\n===== ✅ FIN DE L’INSPECTION =====") def analyze_model(self, pair, model, X_train, X_valid, y_train, y_valid): """ Analyse complète d'un modèle ML supervisé (classification binaire). Affiche performances, importance des features, matrices, seuils, etc. """ os.makedirs(self.path, exist_ok=True) # ---- Prédictions ---- preds = model.predict(X_valid) probs = model.predict_proba(X_valid)[:, 1] if hasattr(model, "predict_proba") else preds # ---- Performances globales ---- print("===== 📊 ÉVALUATION DU MODÈLE =====") print("Colonnes du modèle :", model.feature_names_in_) print("Colonnes X_valid :", list(X_valid.columns)) print(f"Accuracy: {accuracy_score(y_valid, preds):.3f}") print(f"ROC AUC : {roc_auc_score(y_valid, probs):.3f}") print("TN (True Negative) / FP (False Positive)") print("FN (False Negative) / TP (True Positive)") print("\nRapport de classification :\n", classification_report(y_valid, preds)) # | Élément | Valeur | Signification | # | ------------------- | ------ | ----------------------------------------------------------- | # | TN (True Negative) | 983 | Modèle a correctement prédit 0 (pas d’achat) | # | FP (False Positive) | 43 | Modèle a prédit 1 alors que c’était 0 (faux signal d’achat) | # | FN (False Negative) | 108 | Modèle a prédit 0 alors que c’était 1 (manqué un achat) | # | TP (True Positive) | 19 | Modèle a correctement prédit 1 (bon signal d’achat) | # ---- Matrice de confusion ---- cm = confusion_matrix(y_valid, preds) print("Matrice de confusion :\n", cm) plt.figure(figsize=(4, 4)) plt.imshow(cm, cmap="Blues") plt.title("Matrice de confusion") plt.xlabel("Prédit") plt.ylabel("Réel") for i in range(2): for j in range(2): plt.text(j, i, cm[i, j], ha="center", va="center", color="black") # plt.show() plt.savefig(os.path.join(self.path, "Matrice de confusion.png"), bbox_inches="tight") plt.close() # ---- Importance des features ---- if hasattr(model, "feature_importances_"): print("\n===== 🔍 IMPORTANCE DES FEATURES =====") importance = pd.DataFrame({ "feature": X_train.columns, "importance": model.feature_importances_ }).sort_values(by="importance", ascending=False) print(importance) # Crée une figure plus grande fig, ax = plt.subplots(figsize=(24, 8)) # largeur=24 pouces, hauteur=8 pouces # Trace le bar plot sur cet axe importance.plot.bar(x="feature", y="importance", legend=False, ax=ax) # Tourner les labels pour plus de lisibilité ax.set_xticklabels(ax.get_xticklabels(), rotation=45, ha='right') plt.title("Importance des features") # plt.show() plt.savefig(os.path.join(self.path, "Importance des features.png"), bbox_inches="tight") plt.close() # ---- Arbre de décision (extrait) ---- if hasattr(model, "estimators_"): print("\n===== 🌳 EXTRAIT D’UN ARBRE =====") print(export_text(model.estimators_[0], feature_names=list(X_train.columns))[:800]) # ---- Précision selon le seuil ---- thresholds = np.linspace(0.1, 0.9, 9) print("\n===== ⚙️ PERFORMANCE SELON SEUIL =====") for t in thresholds: preds_t = (probs > t).astype(int) acc = accuracy_score(y_valid, preds_t) print(f"Seuil {t:.1f} → précision {acc:.3f}") # ---- ROC Curve ---- fpr, tpr, _ = roc_curve(y_valid, probs) plt.figure(figsize=(5, 4)) plt.plot(fpr, tpr, label="ROC curve") plt.plot([0, 1], [0, 1], linestyle="--", color="gray") plt.xlabel("Taux de faux positifs") plt.ylabel("Taux de vrais positifs") plt.title("Courbe ROC") plt.legend() # plt.show() plt.savefig(os.path.join(self.path, "Courbe ROC.png"), bbox_inches="tight") plt.close() # # ---- Interprétation SHAP (optionnelle) ---- # try: # import shap # # print("\n===== 💡 ANALYSE SHAP =====") # explainer = shap.TreeExplainer(model) # shap_values = explainer.shap_values(X_valid) # # shap.summary_plot(shap_values[1], X_valid) # # Vérifie le type de sortie de shap_values # if isinstance(shap_values, list): # # Cas des modèles de classification (plusieurs classes) # shap_values_to_plot = shap_values[0] if len(shap_values) == 1 else shap_values[1] # else: # shap_values_to_plot = shap_values # # # Ajustement des dimensions au besoin # if shap_values_to_plot.shape[1] != X_valid.shape[1]: # print(f"⚠️ Mismatch dimensions SHAP ({shap_values_to_plot.shape[1]}) vs X_valid ({X_valid.shape[1]})") # min_dim = min(shap_values_to_plot.shape[1], X_valid.shape[1]) # shap_values_to_plot = shap_values_to_plot[:, :min_dim] # X_to_plot = X_valid.iloc[:, :min_dim] # else: # X_to_plot = X_valid # # plt.figure(figsize=(12, 4)) # shap.summary_plot(shap_values_to_plot, X_to_plot, show=False) # plt.savefig(os.path.join(self.path, "shap_summary.png"), bbox_inches="tight") # plt.close() # except ImportError: # print("\n(SHAP non installé — `pip install shap` pour activer l’analyse SHAP.)") y_proba = model.predict_proba(X_valid)[:, 1] # Trace ou enregistre le graphique self.plot_threshold_analysis(y_valid, y_proba, step=0.05, save_path=f"{self.path}/threshold_analysis.png") # y_valid : vraies classes (0 / 1) # y_proba : probabilités de la classe 1 prédites par ton modèle # Exemple : y_proba = model.predict_proba(X_valid)[:, 1] seuils = np.arange(0.0, 1.01, 0.05) precisions, recalls, f1s = [], [], [] for seuil in seuils: y_pred = (y_proba >= seuil).astype(int) precisions.append(precision_score(y_valid, y_pred)) recalls.append(recall_score(y_valid, y_pred)) f1s.append(f1_score(y_valid, y_pred)) plt.figure(figsize=(10, 6)) plt.plot(seuils, precisions, label='Précision', marker='o') plt.plot(seuils, recalls, label='Rappel', marker='o') plt.plot(seuils, f1s, label='F1-score', marker='o') # Ajoute un point pour le meilleur F1 best_idx = np.argmax(f1s) plt.scatter(seuils[best_idx], f1s[best_idx], color='red', s=80, label=f'Max F1 ({seuils[best_idx]:.2f})') plt.title("Performance du modèle selon le seuil de probabilité") plt.xlabel("Seuil de probabilité (classe 1)") plt.ylabel("Score") plt.grid(True, alpha=0.3) plt.legend() plt.savefig(f"{self.path}/seuil_de_probabilite.png", bbox_inches='tight') # plt.show() print(f"✅ Meilleur F1 : {f1s[best_idx]:.3f} au seuil {seuils[best_idx]:.2f}") print("\n===== ✅ FIN DE L’ANALYSE =====") def plot_threshold_analysis(self, y_true, y_proba, step=0.05, save_path=None): """ Affiche la précision, le rappel et le F1-score selon le seuil de décision. y_true : labels réels (0 ou 1) y_proba : probabilités prédites (P(hausse)) step : pas entre les seuils testés save_path : si renseigné, enregistre l'image au lieu d'afficher """ # Le graphique généré affichera trois courbes : # 🔵 Precision — la fiabilité de tes signaux haussiers. # 🟢 Recall — la proportion de hausses que ton modèle détecte. # 🟣 F1-score — le compromis optimal entre les deux. thresholds = np.arange(0, 1.01, step) precisions, recalls, f1s = [], [], [] for thr in thresholds: preds = (y_proba >= thr).astype(int) precisions.append(precision_score(y_true, preds)) recalls.append(recall_score(y_true, preds)) f1s.append(f1_score(y_true, preds)) plt.figure(figsize=(10, 6)) plt.plot(thresholds, precisions, label="Precision", linewidth=2) plt.plot(thresholds, recalls, label="Recall", linewidth=2) plt.plot(thresholds, f1s, label="F1-score", linewidth=2, linestyle="--") plt.axvline(0.5, color='gray', linestyle=':', label="Seuil 0.5") plt.title("📊 Performance selon le seuil de probabilité", fontsize=14) plt.xlabel("Seuil de décision (threshold)") plt.ylabel("Score") plt.legend() plt.grid(True, alpha=0.3) if save_path: plt.savefig(save_path, bbox_inches='tight') print(f"✅ Graphique enregistré : {save_path}") else: plt.show() # # ============================= # # Exemple d’utilisation : # # ============================= # if __name__ == "__main__": # # Exemple : chargement d’un modèle et test # import joblib # # model = joblib.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/model.pkl") # data = np.load("/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/test_data.npz") # X_test, y_test = data["X"], data["y"] # # y_proba = model.predict_proba(X_test)[:, 1] # # # Trace ou enregistre le graphique # plot_threshold_analysis(y_test, y_proba, step=0.05, # save_path="/media/Home/home/souti/freqtrade/user_data/strategies/tools/sklearn/threshold_analysis.png") def populateDataframe(self, dataframe, timeframe='5m'): dataframe = dataframe.copy() heikinashi = qtpylib.heikinashi(dataframe) dataframe['haopen'] = heikinashi['open'] dataframe['haclose'] = heikinashi['close'] dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose'] dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2 dataframe["percent"] = dataframe['close'].pct_change() dataframe["percent5"] = dataframe['close'].pct_change(5) dataframe["percent12"] = dataframe['close'].pct_change(12) dataframe["percent24"] = dataframe['close'].pct_change(24) dataframe["hapercent3"] = (dataframe['haclose'] - dataframe['haopen'].shift(3)) / dataframe['haclose'].shift(3) # if self.dp.runmode.value in ('backtest'): # dataframe['futur_percent'] = 100 * (dataframe['close'].shift(-1) - dataframe['close']) / dataframe['close'] dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean() self.calculeDerivees(dataframe, 'sma5', timeframe=timeframe, ema_period=5) dataframe['sma12'] = dataframe['mid'].ewm(span=12, adjust=False).mean() #dataframe["mid"].rolling(window=12).mean() self.calculeDerivees(dataframe, 'sma12', timeframe=timeframe, ema_period=12) dataframe['sma24'] = dataframe['mid'].ewm(span=24, adjust=False).mean() #dataframe["mid"].rolling(window=24).mean() self.calculeDerivees(dataframe, 'sma24', timeframe=timeframe, ema_period=24) dataframe['sma48'] = dataframe['mid'].ewm(span=48, adjust=False).mean() #dataframe["mid"].rolling(window=48).mean() self.calculeDerivees(dataframe, 'sma48', timeframe=timeframe, ema_period=48) dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean() #dataframe["mid"].rolling(window=60).mean() self.calculeDerivees(dataframe, 'sma60', timeframe=timeframe, ema_period=60) dataframe = self.calculateDerivation(dataframe, window=3, suffixe="_3",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=5, suffixe="_5",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=12, suffixe="_12",timeframe=timeframe) dataframe = self.calculateDerivation(dataframe, window=24, suffixe="_24", timeframe=timeframe) # print(metadata['pair']) dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14) dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12) dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24) self.calculeDerivees(dataframe, 'rsi', timeframe=timeframe, ema_period=12) dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12) dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12) dataframe['max60'] = talib.MAX(dataframe['close'], timeperiod=60) dataframe['min60'] = talib.MIN(dataframe['close'], timeperiod=60) dataframe['min_max_60'] = ((dataframe['max60'] - dataframe['close']) / dataframe['min60']) # dataframe['min36'] = talib.MIN(dataframe['close'], timeperiod=36) # dataframe['max36'] = talib.MAX(dataframe['close'], timeperiod=36) # dataframe['pct36'] = 100 * (dataframe['max36'] - dataframe['min36']) / dataframe['min36'] # dataframe['maxpct36'] = talib.MAX(dataframe['pct36'], timeperiod=36) # Bollinger Bands bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2) dataframe['bb_lowerband'] = bollinger['lower'] dataframe['bb_middleband'] = bollinger['mid'] dataframe['bb_upperband'] = bollinger['upper'] dataframe["bb_percent"] = ( (dataframe["close"] - dataframe["bb_lowerband"]) / (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) ) dataframe["bb_width"] = (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["sma24"] # dataframe["bb_width"] = ( # (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"] # ) # Calcul MACD macd, macdsignal, macdhist = talib.MACD( dataframe['close'], fastperiod=12, slowperiod=26, signalperiod=9 ) # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # Ajouter dans le dataframe dataframe['macd'] = macd dataframe['macdsignal'] = macdsignal dataframe['macdhist'] = macdhist # Regarde dans le futur # # --- Rendre relatif sur chaque série (-1 → 1) --- # for col in ['macd', 'macdsignal', 'macdhist']: # series = dataframe[col] # valid = series[~np.isnan(series)] # ignorer NaN # min_val = valid.min() # max_val = valid.max() # span = max_val - min_val if max_val != min_val else 1 # dataframe[f'{col}_rel'] = 2 * ((series - min_val) / span) - 1 # # dataframe['tdc_macd'] = self.macd_tendance_int( # dataframe, # macd_col='macd_rel', # signal_col='macdsignal_rel', # hist_col='macdhist_rel' # ) # ------------------------------------------------------------------------------------ # rolling SMA indicators (used for trend detection too) s_short = self.DEFAULT_PARAMS['sma_short'] s_long = self.DEFAULT_PARAMS['sma_long'] dataframe[f'sma_{s_short}'] = dataframe['close'].rolling(window=s_short).mean() dataframe[f'sma_{s_long}'] = dataframe['close'].rolling(window=s_long).mean() # --- pente brute --- dataframe['slope'] = dataframe['sma24'].diff() # --- lissage EMA --- dataframe['slope_smooth'] = dataframe['slope'].ewm(span=10, adjust=False).mean() # # RSI # window = 14 # delta = dataframe['close'].diff() # up = delta.clip(lower=0) # down = -1 * delta.clip(upper=0) # ma_up = up.rolling(window=window).mean() # ma_down = down.rolling(window=window).mean() # rs = ma_up / ma_down.replace(0, 1e-9) # dataframe['rsi'] = 100 - (100 / (1 + rs)) # # # EMA example # dataframe['ema'] = dataframe['close'].ewm(span=self.DEFAULT_PARAMS['ema_period'], adjust=False).mean() # # # ATR (simple implementation) # high_low = dataframe['high'] - dataframe['low'] # high_close = (dataframe['high'] - dataframe['close'].shift()).abs() # low_close = (dataframe['low'] - dataframe['close'].shift()).abs() # tr = DataFrame({'hl': high_low, 'hc': high_close, 'lc': low_close}).max(axis=1) # dataframe['atr'] = tr.rolling(window=self.DEFAULT_PARAMS['atr_period']).mean() ########################### # df = ton DataFrame OHLCV avec colonnes: open, high, low, close, volume # Assure-toi qu'il est trié par date croissante # --- Volatilité normalisée --- dataframe['atr'] = ta.volatility.AverageTrueRange( high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 ).average_true_range() dataframe['atr_norm'] = dataframe['atr'] / dataframe['close'] # # --- Force de tendance --- # dataframe['adx'] = ta.trend.ADXIndicator( # high=dataframe['high'], low=dataframe['low'], close=dataframe['close'], window=14 # ).adx() # # # --- Volume directionnel (On Balance Volume) --- # dataframe['obv'] = ta.volume.OnBalanceVolumeIndicator( # close=dataframe['close'], volume=dataframe['volume'] # ).on_balance_volume() # self.calculeDerivees(dataframe, 'obv', timeframe=timeframe, ema_period=1) # # dataframe['obv5'] = ta.volume.OnBalanceVolumeIndicator( # close=dataframe['sma5'], volume=dataframe['volume'].rolling(5).sum() # ).on_balance_volume() # self.calculeDerivees(dataframe, 'obv5', timeframe=timeframe, ema_period=5) # --- Volatilité récente (écart-type des rendements) --- # dataframe['vol_24'] = dataframe['percent'].rolling(24).std() # Compter les baisses / hausses consécutives # self.calculateDownAndUp(dataframe, limit=0.0001) # df : ton dataframe OHLCV + indicateurs existants # Assurez-vous que les colonnes suivantes existent : # 'max_rsi_12', 'roc_24', 'bb_percent_1h' # --- Filtrage des NaN initiaux --- # dataframe = dataframe.dropna() # dataframe['rsi_slope'] = dataframe['rsi'].diff(3) / 3 # vitesse moyenne du RSI # dataframe['adx_change'] = dataframe['adx'] - dataframe['adx'].shift(12) # évolution de la tendance # dataframe['volatility_ratio'] = dataframe['atr_norm'] / dataframe['bb_width'] # dataframe["rsi_diff"] = dataframe["rsi"] - dataframe["rsi"].shift(3) # dataframe["slope_ratio"] = dataframe["sma5_deriv1"] / (dataframe["sma60_deriv1"] + 1e-9) # dataframe["divergence"] = (dataframe["rsi_deriv1"] * dataframe["sma5_deriv1"]) < 0 ########################### # # dataframe['volume_sma_deriv'] = dataframe['volume'] * dataframe['sma5_deriv1'] / (dataframe['volume'].rolling(5).mean()) # self.calculeDerivees(dataframe, 'volume', timeframe=timeframe, ema_period=12) self.setTrends(dataframe) return dataframe def feature_auc_scores(self, X, y): aucs = {} for col in X.columns: try: aucs[col] = roc_auc_score(y, X[col].ffill().fillna(0)) except Exception: aucs[col] = np.nan return pd.Series(aucs).sort_values(ascending=False) def macd_tendance_int(self, dataframe: pd.DataFrame, macd_col='macd', signal_col='macdsignal', hist_col='macdhist', eps=0.0) -> pd.Series: """ Renvoie la tendance MACD sous forme d'entiers. 2 : Haussier 1 : Ralentissement hausse 0 : Neutre -1 : Ralentissement baisse -2 : Baissier """ # | Nom | Formule / définition | Signification | # | ---------------------------- | ------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | # | **MACD** (`macd`) | `EMA_fast - EMA_slow` (ex : 12-26 périodes) | Montre l’écart entre la moyenne courte et la moyenne longue.
- Positive → tendance haussière
- Négative → tendance baissière | # | **Signal** (`macdsignal`) | `EMA_9(MACD)` | Sert de ligne de **signal de déclenchement**.
- Croisement du MACD au-dessus → signal d’achat
- Croisement du MACD en dessous → signal de vente | # | **Histogramme** (`macdhist`) | `MACD - Signal` | Montre la **force et l’accélération** de la tendance.
- Positif et croissant → tendance haussière qui s’accélère
- Positif mais décroissant → ralentissement de la hausse
- Négatif et décroissant → baisse qui s’accélère
- Négatif mais croissant → ralentissement de la baisse | # | Situation | MACD | Signal | Hist | Interprétation | # | -------------------------- | ---------- | --------- | -------- | ------------------------------------------ | # | MACD > 0, Hist croissant | au-dessus | croissant | Haussier | Momentum fort → tendance haussière | # | MACD > 0, Hist décroissant | au-dessus | en baisse | Momentum | La hausse ralentit, prudence | # | MACD < 0, Hist décroissant | en dessous | en baisse | Baissier | Momentum fort → tendance baissière | # | MACD < 0, Hist croissant | en dessous | en hausse | Rebond ? | La baisse ralentit → possible retournement | # Créer une série de 0 par défaut tendance = pd.Series(0, index=dataframe.index) # Cas MACD > signal mask_up = dataframe[macd_col] > dataframe[signal_col] + eps mask_up_hist_pos = mask_up & (dataframe[hist_col] > 0) mask_up_hist_neg = mask_up & (dataframe[hist_col] <= 0) tendance[mask_up_hist_pos] = 2 # Haussier tendance[mask_up_hist_neg] = 1 # Ralentissement hausse # Cas MACD < signal mask_down = dataframe[macd_col] < dataframe[signal_col] - eps mask_down_hist_neg = mask_down & (dataframe[hist_col] < 0) mask_down_hist_pos = mask_down & (dataframe[hist_col] >= 0) tendance[mask_down_hist_neg] = -2 # Baissier tendance[mask_down_hist_pos] = -1 # Ralentissement baisse # Les NaN deviennent neutre tendance[dataframe[[macd_col, signal_col, hist_col]].isna().any(axis=1)] = 0 return tendance def calculateDownAndUp(self, dataframe, limit=0.0001): dataframe['down'] = dataframe['hapercent'] <= limit dataframe['up'] = dataframe['hapercent'] >= limit dataframe['down_count'] = - dataframe['down'].astype(int) * ( dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1) dataframe['up_count'] = dataframe['up'].astype(int) * ( dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1) # Créer une colonne vide dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count') dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count') def calculateDerivation(self, dataframe, window=12, suffixe='', timeframe='5m'): dataframe[f"mid_smooth{suffixe}"] = dataframe['mid'].rolling(window).mean() dataframe = self.calculeDerivees(dataframe, f"mid_smooth{suffixe}", timeframe=timeframe, ema_period=window) return dataframe def calculeDerivees( self, dataframe: pd.DataFrame, name: str, suffixe: str = '', window: int = 100, coef: float = 0.15, ema_period: int = 10, verbose: bool = True, timeframe: str = '5m' ) -> pd.DataFrame: """ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency avec epsilon adaptatif basé sur rolling percentiles. """ # dataframe = dataframe.copy() d1_col = f"{name}{suffixe}_deriv1" d2_col = f"{name}{suffixe}_deriv2" # d1s_col = f"{name}{suffixe}_deriv1_smooth" # d2s_col = f"{name}{suffixe}_deriv2_smooth" tendency_col = f"{name}{suffixe}_state" factor1 = 100 * (ema_period / 5) factor2 = 10 * (ema_period / 5) series = dataframe[f"{name}{suffixe}"] cond_bas = (series.shift(2) > series.shift(1)) & (series.shift(1) < series) cond_haut = (series.shift(2) < series.shift(1)) & (series.shift(1) > series) dataframe[f"{name}{suffixe}_inv_bas"] = np.where(cond_bas, -1, 0) dataframe[f"{name}{suffixe}_inv_hau"] = np.where(cond_haut, 1, 0) # --- Distance à la moyenne mobile --- dataframe[f"{name}{suffixe}_dist"] = (dataframe['close'] - dataframe[f"{name}{suffixe}"]) / dataframe[f"{name}{suffixe}"] # dérivée relative simple dataframe[d1_col] = factor1 * ((dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3)) # lissage EMA # dataframe[d1_col] = factor1 * dataframe[d1_col].ewm(span=ema_period, adjust=False).mean() # dataframe[d1_col] = dataframe[d1_col].rolling(window=ema_period, center=True).median() dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1)) # dataframe[d2_col] = factor2 * dataframe[d2_col].ewm(span=ema_period, adjust=False).mean() # epsilon adaptatif via rolling percentile p_low_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d1 = dataframe[d1_col].rolling(window=window, min_periods=1).quantile(0.95) p_low_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.05) p_high_d2 = dataframe[d2_col].rolling(window=window, min_periods=1).quantile(0.95) eps_d1_series = ((p_low_d1.abs() + p_high_d1.abs()) / 2) * coef eps_d2_series = ((p_low_d2.abs() + p_high_d2.abs()) / 2) * coef # fallback global eps global_eps_d1 = (abs(dataframe[d1_col].quantile(0.05)) + abs(dataframe[d1_col].quantile(0.95))) / 2 * coef global_eps_d2 = (abs(dataframe[d2_col].quantile(0.05)) + abs(dataframe[d2_col].quantile(0.95))) / 2 * coef eps_d1_series = eps_d1_series.fillna(global_eps_d1).replace(0, global_eps_d1) eps_d2_series = eps_d2_series.fillna(global_eps_d2).replace(0, global_eps_d2) # if verbose and self.dp.runmode.value in ('backtest'): # stats = dataframe[[d1_col, d2_col]].agg(['min', 'max']).T # stats['abs_max'] = dataframe[[d1_col, d2_col]].abs().max(axis=0) # print(f"---- Derivatives stats {timeframe}----") # print(stats) # print(f"rolling window = {window}, coef = {coef}, ema_period = {ema_period}") # print("---------------------------") # mapping tendency def tag_by_derivatives(row): idx = int(row.name) d1v = float(row[d1_col]) d2v = float(row[d2_col]) eps1 = float(eps_d1_series.iloc[idx]) eps2 = float(eps_d2_series.iloc[idx]) # # mapping état → codes 3 lettres explicites # # | Ancien état | Nouveau code 3 lettres | Interprétation | # # | ----------- | ---------------------- | --------------------- | # # | 4 | HAU | Hausse Accélérée | # # | 3 | HSR | Hausse Ralentissement | # # | 2 | HST | Hausse Stable | # # | 1 | DHB | Départ Hausse | # # | 0 | PAL | Palier / neutre | # # | -1 | DBD | Départ Baisse | # # | -2 | BSR | Baisse Ralentissement | # # | -3 | BST | Baisse Stable | # # | -4 | BAS | Baisse Accélérée | # Palier strict if abs(d1v) <= eps1 and abs(d2v) <= eps2: return 0 # Départ si d1 ~ 0 mais d2 signale direction if abs(d1v) <= eps1: return 1 if d2v > eps2 else -1 if d2v < -eps2 else 0 # Hausse if d1v > eps1: return 4 if d2v > eps2 else 3 # Baisse if d1v < -eps1: return -4 if d2v < -eps2 else -2 return 0 dataframe[tendency_col] = dataframe.apply(tag_by_derivatives, axis=1) # if timeframe == '1h' and verbose and self.dp.runmode.value in ('backtest'): # print("##################") # print(f"# STAT {timeframe} {name}{suffixe}") # print("##################") # self.calculateProbabilite2Index(dataframe, futur_cols=['futur_percent'], indic_1=f"{name}{suffixe}_deriv1", indic_2=f"{name}{suffixe}_deriv2") return dataframe def getOpenTrades(self): # if len(self.trades) == 0: self.trades = Trade.get_open_trades() return self.trades def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: pair = metadata['pair'] # Backtested 2025-04-09 00:00:00 -> 2025-05-25 00:00:00 | Max open trades : 1 # ┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓ # ┃ Strategy ┃ Trades ┃ Avg Profit % ┃ Tot Profit USDC ┃ Tot Profit % ┃ Avg Duration ┃ Win Draw Loss Win% ┃ Drawdown ┃ # ┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩ # │ Zeus_8_3_2_B_4_2 │ 76 │ 0.17 │ 126.862 │ 12.69 │ 12:51:00 │ 51 0 25 67.1 │ 55.742 USDC 4.71% │ # └──────────────────┴────────┴──────────────┴─────────────────┴──────────────┴──────────────┴────────────────────────┴────────────────────┘ # dataframe.loc[ # ( # # (dataframe["mid_smooth_5_deriv1_1d"] > 0) # (dataframe["percent12"] > 0) # & (dataframe['sma48'] > dataframe['sma48'].shift(1)) # # & (dataframe['sma48'] < dataframe['sma48'].shift(1) + 10) # & (dataframe['sma5_1h'] >= dataframe['sma5_1h'].shift(13)) # & (dataframe['stop_buying'] == False) # # & (dataframe['trend_class_1h'] <= -1) # # & (dataframe['mid_smooth_5_state_1d'] >= -2) #or '-':>3}|{last_candle['mid_smooth_24_state_1h'] ) # ), ['enter_long', 'enter_tag']] = (1, 'sma48') # Backtested 2025-04-09 00:00:00 -> 2025-05-25 00:00:00 | Max open trades : 1 # ┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓ # ┃ Strategy ┃ Trades ┃ Avg Profit % ┃ Tot Profit USDC ┃ Tot Profit % ┃ Avg Duration ┃ Win Draw Loss Win% ┃ Drawdown ┃ # ┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩ # │ Zeus_8_3_2_B_4_2 │ 60 │ 0.17 │ 103.408 │ 10.34 │ 12:52:00 │ 44 0 16 73.3 │ 38.701 USDC 3.53% │ # └──────────────────┴────────┴──────────────┴─────────────────┴──────────────┴──────────────┴────────────────────────┴────────────────────┘ # dataframe.loc[ # ( # (dataframe["sma48_deriv1"] <= 0) # & (dataframe['min12_1h'].shift(36) == dataframe['min12_1h']) # & (dataframe["sma12_deriv1"] > dataframe["sma12_deriv1"].shift(1)) # ), ['enter_long', 'enter_tag']] = (1, 'min12_1h') # Backtested 2025-04-09 00:00:00 -> 2025-05-25 00:00:00 | Max open trades : 1 # ┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━┓ # ┃ Strategy ┃ Trades ┃ Avg Profit % ┃ Tot Profit USDC ┃ Tot Profit % ┃ Avg Duration ┃ Win Draw Loss Win% ┃ Drawdown ┃ # ┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━┩ # │ Zeus_8_3_2_B_4_2 │ 28 │ 0.49 │ 136.071 │ 13.61 │ 1 day, 2:11:00 │ 27 0 1 96.4 │ 37.149 USDC 3.17% │ # └──────────────────┴────────┴──────────────┴─────────────────┴──────────────┴────────────────┴────────────────────────┴────────────────────┘ # dataframe.loc[ # ( # (dataframe['sma5_inv_bas_1h'] == -1) # ), ['enter_long', 'enter_tag']] = (1, 'sma5_inv') # Backtested 2025-04-09 00:00:00 -> 2025-05-25 00:00:00 | Max open trades : 1 # ┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓ # ┃ Strategy ┃ Trades ┃ Avg Profit % ┃ Tot Profit USDC ┃ Tot Profit % ┃ Avg Duration ┃ Win Draw Loss Win% ┃ Drawdown ┃ # ┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩ # │ Zeus_8_3_2_B_4_2 │ 32 │ 0.60 │ 191.920 │ 19.19 │ 20:56:00 │ 26 0 6 81.2 │ 48.62 USDC 4.02% │ # └──────────────────┴────────┴──────────────┴─────────────────┴──────────────┴──────────────┴────────────────────────┴───────────────────┘ dataframe.loc[ ( (dataframe['sma5_inv_bas_1h'] == -1) & (dataframe['sma24_1h'].shift(1) <= dataframe['sma24_1h']) ), ['enter_long', 'enter_tag']] = (1, 'sma5_inv') dataframe.loc[ ( (dataframe['sma48_inv_bas'] == -1) & (dataframe['sma24_1h'].shift(1) <= dataframe['sma24_1h']) ), ['enter_long', 'enter_tag']] = (1, 'sma48_inv') # dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan) if self.dp.runmode.value in ('backtest'): dataframe.to_feather(f"user_data/backtest_results/{metadata['pair'].replace('/', '_')}_df.feather") return dataframe def calculateProbabilite2Index(self, df, futur_cols, indic_1, indic_2): # # Définition des tranches pour les dérivées # bins_deriv = [-np.inf, -0.05, -0.01, 0.01, 0.05, np.inf] # labels = ['forte baisse', 'légère baisse', 'neutre', 'légère hausse', 'forte hausse'] # # # Ajout des colonnes bin (catégorisation) # df[f"{indic_1}_bin"] = pd.cut(df['mid_smooth_1h_deriv1'], bins=bins_deriv, labels=labels) # df[f"{indic_2}_bin"] = pd.cut(df['mid_smooth_deriv1_1d'], bins=bins_deriv, labels=labels) # # # Colonnes de prix futur à analyser # futur_cols = ['futur_percent_1h', 'futur_percent_2h', 'futur_percent_3h', 'futur_percent_4h', 'futur_percent_5h'] # # # Calcul des moyennes et des effectifs # grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"])[futur_cols].agg(['mean', 'count']) # # pd.set_option('display.width', 200) # largeur max affichage # pd.set_option('display.max_columns', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', 300) # largeur max affichage # nettoyage # series = df[f"{indic_2}"].dropna() # unique_vals = df[f"{indic_2}"].nunique() # print(unique_vals) # print(df[f"{indic_2}"]) n = len(self.labels) df[f"{indic_1}_bin"], bins_1h = pd.qcut(df[f"{indic_1}"], q=n, labels=self.labels, retbins=True, duplicates='drop') df[f"{indic_2}_bin"], bins_1d = pd.qcut(df[f"{indic_2}"], q=n, labels=self.labels, retbins=True, duplicates='drop') # Affichage formaté pour code Python print(f"Bornes des quantiles pour {indic_1} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]") print(f"Bornes des quantiles pour {indic_2} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]") # Agrégation grouped = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[futur_cols].agg(['mean', 'count']) # Affichage with pd.option_context('display.max_rows', None, 'display.max_columns', None): print(grouped.round(4)) # Ajout des probabilités de hausse for col in futur_cols: df[f"{col}_is_up"] = df[col] > 0 # Calcul de la proba de hausse proba_up = df.groupby([f"{indic_2}_bin", f"{indic_1}_bin"], observed=True)[f"{col}_is_up"].mean().unstack() print(f"\nProbabilité de hausse pour {col} (en %):") with pd.option_context('display.max_rows', None, 'display.max_columns', None): print((proba_up * 100).round(1)) # Affichage formaté des valeurs comme tableau Python with pd.option_context('display.max_rows', None, 'display.max_columns', None): df_formatted = (proba_up * 100).round(1) print("data = {") for index, row in df_formatted.iterrows(): row_values = ", ".join([f"{val:.1f}" for val in row]) print(f"'{index}': [{row_values}], ") print("}") data = {} for index, row in df_formatted.iterrows(): # on convertit proprement avec arrondi comme dans ton print, mais en données réelles data[index] = [ None if (isinstance(val, float) and math.isnan(val)) else val for val in row ] # Niveaux unicode pour les barres verticales (style sparkline) # spark_chars = "▁▂▃▄▅▆▇█" # print(data.values()) # # Collecte globale min/max # all_values = [] # for vals in data.values(): # all_values.extend(v for v in vals if not (isinstance(v, float) and math.isnan(v))) # # global_min = min(all_values) if all_values else 0 # global_max = max(all_values) if all_values else 1 # global_span = (global_max - global_min) if global_max != global_min else 1 # # def sparkline_global(values): # if all(isinstance(v, float) and math.isnan(v) for v in values): # return "(no data)" # out = "" # for v in values: # if isinstance(v, float) and math.isnan(v): # out += " " # else: # idx = int((v - global_min) / global_span * (len(spark_chars) - 1)) # out += spark_chars[idx] # return out # # for key, values in data.items(): # print(f"{key:>3} : {sparkline_global(values)}") # Palette ANSI 256 couleurs pour heatmap def get_ansi_color(val): """ Échelle fixe 0→100 : 0-20 : bleu (21) 20-40 : cyan (51) 40-60 : vert/jaune (46 / 226) 60-80 : orange (208) 80-100 : rouge (196) """ if val is None: return "" if val < 0: val = 0 elif val > 100: val = 100 if val <= 20: code = 21 elif val <= 40: code = 51 elif val <= 60: code = 226 elif val <= 80: code = 208 else: code = 196 return f"\033[38;5;{code}m" RESET = "\033[0m" # Affichage columns = ['B3', 'B2', 'B1', 'N0', 'H1', 'H2', 'H3'] header = " " + " ".join([f"{col:>6}" for col in columns]) print(header) print("-" * len(header)) for key, values in data.items(): line = f"{key:>3} |" for v in values: if v is None: line += f" {' '} " # vide pour NaN / None else: color = get_ansi_color(v) line += f" {color}{v:5.1f}{RESET} " print(line) def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: # dataframe.loc[ # ( # (dataframe['sma48'] < dataframe['sma48'].shift(1) - 10) # ), ['exit_long', 'exit_tag']] = (1, 'sma48') return dataframe def adjust_trade_position(self, trade: Trade, current_time: datetime, current_rate: float, current_profit: float, min_stake: float, max_stake: float, **kwargs): # ne rien faire si ordre deja en cours if trade.has_open_orders: # print("skip open orders") return None if (self.wallets.get_available_stake_amount() < 0): # or trade.stake_amount >= max_stake: return 0 dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe) last_candle = dataframe.iloc[-1].squeeze() before_last_candle = dataframe.iloc[-2].squeeze() before_last_candle_12 = dataframe.iloc[-13].squeeze() before_last_candle_24 = dataframe.iloc[-25].squeeze() last_candle_3 = dataframe.iloc[-4].squeeze() last_candle_previous_1h = dataframe.iloc[-13].squeeze() # prépare les données current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) dispo = round(self.wallets.get_available_stake_amount()) hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0 days_since_first_buy = (current_time - trade.open_date_utc).days hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0 count_of_buys = trade.nr_of_successful_entries current_time_utc = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pair = trade.pair profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1) last_lost = self.getLastLost(last_candle, pair) pct_first = 0 total_counts = sum( pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC') if self.pairs[pair]['first_buy']: pct_first = self.getPctFirstBuy(pair, last_candle) pct = self.pct.value if count_of_buys == 1: pct_max = current_profit else: if self.pairs[trade.pair]['last_buy']: pct_max = self.getPctLastBuy(pair, last_candle) else: pct_max = - pct if (self.getShortName(pair) == 'BTC') or count_of_buys <= 2: lim = - pct - (count_of_buys * self.pct_inc.value) # lim = self.getLimitBuy(pair, last_candle, pct) # lim = - (0.012 * (1 + round(count_of_buys / 5)) + 0.001 * (count_of_buys - 1)) # lim = - (0.012 + 0.001 * (count_of_buys - 1) + (0.002 * count_of_buys if count_of_buys > 10 else 0.001 * count_of_buys if count_of_buys > 5 else 0)) else: pct = 0.05 lim = - pct - (count_of_buys * 0.0025) # lim = self.getLimitBuy(pair, last_candle, pct) if (len(dataframe) < 1): # print("skip dataframe") return None if not self.should_enter_trade(pair, last_candle, current_time): return None condition = last_candle['sma48'] > before_last_candle['sma48'] + 20 # if (self.getShortName(pair) != 'BTC' and count_of_buys > 3): # condition = before_last_candle_24['mid_smooth_3_1h'] > before_last_candle_12['mid_smooth_3_1h'] and before_last_candle_12['mid_smooth_3_1h'] < last_candle['mid_smooth_3_1h'] #and last_candle['mid_smooth_3_deriv1_1h'] < -1.5 limit_buy = 40 if (count_of_buys < limit_buy) and condition and (pct_max < lim) and hours < 12: try: if self.pairs[pair]['has_gain'] and profit > 0: self.pairs[pair]['force_sell'] = True return None # if 6 <= count_of_buys: # if not ((before_last_candle_24['sma24_deriv1_1h'] > before_last_candle_12['sma24_deriv1_1h']) # & (before_last_candle_12['sma24_deriv1_1h'] < last_candle['sma24_deriv1_1h'])): # return None # print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_first={pct_first:.3f} pct_max={pct_max:.3f} lim={lim:.3f} index={index}") # self.pairs[trade.pair]['last_palier_index'] = index # # Appel de la fonction # poly_func, x_future, y_future, count = self.polynomial_forecast( # dataframe['mid_smooth_12'], # window=self.buy_horizon_predict_1h.value * 12, # degree=4) # # if count < 3: # return None max_amount = self.config.get('stake_amount') * 2.5 stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()), self.adjust_stake_amount(pair, last_candle) * abs(last_lost / self.mise_factor_buy.value)) if stake_amount > 0: trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟧 Loss -", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=trade_type, profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle # df = pd.DataFrame.from_dict(self.pairs, orient='index') # colonnes_a_exclure = ['last_candle', 'stop', # 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy'] # df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure) # # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit'] # # print(df_filtered) return stake_amount return None except Exception as exception: print(exception) return None if (profit > self.pairs[pair]['previous_profit'] and profit > self.pairs[pair]['expected_profit'] and hours > 6 # and last_candle['sma60_deriv1'] > 0 and last_candle['max_rsi_12_1h'] < 75 and last_candle['rsi_1d'] < 58 and last_candle['stop_buying'] == False # and last_candle['mid_smooth_5_deriv1_1d'] > 0 and self.wallets.get_available_stake_amount() > 0 ): try: self.pairs[pair]['previous_profit'] = profit stake_amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount']) if stake_amount > 0: self.pairs[pair]['has_gain'] += 1 trade_type = 'Gain +' self.pairs[trade.pair]['count_of_buys'] += 1 self.pairs[pair]['total_amount'] += stake_amount self.log_trade( last_candle=last_candle, date=current_time, action="🟡 Gain +", dispo=dispo, pair=trade.pair, rate=current_rate, trade_type=str(round(pct_max, 4)), profit=round(profit, 1), buys=trade.nr_of_successful_entries + 1, stake=round(stake_amount, 2) ) self.pairs[trade.pair]['last_buy'] = current_rate self.pairs[trade.pair]['max_touch'] = last_candle['close'] self.pairs[trade.pair]['last_candle'] = last_candle return stake_amount return None except Exception as exception: print(exception) return None return None def getPctFirstBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) def getPctLastBuy(self, pair, last_candle): return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4) def getPct60D(self, pair, last_candle): return round((last_candle['max60_1d'] - last_candle['min60_1d']) / last_candle['max60_1d'], 4) def getPctClose60D(self, pair, last_candle): if last_candle['close'] > last_candle['max12_1d']: return 1 if last_candle['close'] < last_candle['min12_1d']: return 0 return round( (last_candle['close'] - last_candle['min12_1d']) / (last_candle['max12_1d'] - last_candle['min12_1d']), 4) def getLimitBuy(self, pair, last_candle, first_pct): count_of_buys = self.pairs[pair]['count_of_buys'] pct60 = self.getPct60D(pair, last_candle) # exemple 0.3 pour 30% if (pct60 < 0.05): lim = - first_pct - (count_of_buys * 0.001 * 0.05 / 0.05) else: # 0.1 # 0.4 lim = - first_pct - (count_of_buys * 0.001 * pct60 / 0.05) return lim # def getProbaHausseEmaVolume(self, last_candle): # value_1 = self.getValuesFromTable(self.ema_volume, last_candle['ema_volume']) # value_2 = self.getValuesFromTable(self.mid_smooth_1h_deriv1, last_candle['mid_smooth_1h_deriv1']) # # val = self.approx_val_from_bins( # matrice=self.ema_volume_mid_smooth_1h_deriv1_matrice_df, # numeric_matrice=self.ema_volume_mid_smooth_1h_deriv1_numeric_matrice, # row_label=value_2, # col_label=value_1 # ) # return val def getProbaHausseSma5d(self, last_candle): value_1 = self.getValuesFromTable(self.sma5_deriv1, last_candle['sma5_deriv1_1d']) value_2 = self.getValuesFromTable(self.sma5_deriv2, last_candle['sma5_deriv2_1d']) # print(f"{last_candle['sma5_deriv1_1d']} => {value_1} / {last_candle['sma5_deriv2_1d']} => {value_2}") val = self.approx_val_from_bins( matrice=self.sma5_derive1_2_matrice_df, numeric_matrice=self.sma5_derive1_2_numeric_matrice, row_label=value_2, col_label=value_1 ) return val def adjust_stake_amount(self, pair: str, last_candle: DataFrame): amount = self.config.get('stake_amount') # state = int(last_candle.get('trend_class_1h')) # # if state is not None: # prefix = "bm" if state < 0 else "bp" # attr_name = f"{prefix}{abs(state)}" # if hasattr(self, attr_name): # amount = getattr(self, attr_name).value # else: # amount = self.config.get('stake_amount') # else: # amount = self.config.get('stake_amount') return min(amount, self.wallets.get_available_stake_amount()) def expectedProfit(self, pair: str, last_candle: DataFrame): lim = 0.01 pct = 0.002 if (self.getShortName(pair) == 'BTC'): lim = 0.005 pct = 0.001 pct_to_max = lim + pct * self.pairs[pair]['count_of_buys'] # if self.pairs[pair]['count_of_buys'] > 6: # pct_to_max = 0.006 * self.pairs[pair]['count_of_buys'] # pctClose60 = self.getPctClose60D(pair, last_candle) # max_60 = last_candle['max60_1d'] # if last_candle['close'] < max_60: # pct_to_max = 0.25 * (max_60 - last_candle['close']) / max_60 # pct_to_max = pct_to_max * (2 - pctClose60) expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max) self.pairs[pair]['expected_profit'] = expected_profit # print( # f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}") return expected_profit def calculateUpDownPct(self, dataframe, key): down_pct_values = np.full(len(dataframe), np.nan) # Remplir la colonne avec les bons calculs for i in range(len(dataframe)): shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \ dataframe['close'].iloc[i - shift_value] return down_pct_values # ✅ Première dérivée(variation ou pente) # Positive: la courbe est croissante → tendance haussière. # Négative: la courbe est décroissante → tendance baissière. # Proche de 0: la courbe est plate → marché stable ou en transition. # # Applications: # Détecter les points d’inflexion(changement de tendance) quand elle s’annule.\ # Analyser la vitesse d’un mouvement(plus elle est forte, plus le mouvement est impulsif). # # ✅ Seconde dérivée(accélération ou concavité) # Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse. # Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse. # Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements. # # Exemples: # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # # Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0. # Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe. def calculateRegression(self, dataframe: DataFrame, column='close', window=50, degree=3, future_offset: int = 10 # projection à n bougies après ) -> DataFrame: df = dataframe.copy() regression_fit = [] regression_future_fit = [] regression_fit = [] regression_future_fit = [] for i in range(len(df)): if i < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # Fin de la fenêtre d’apprentissage end_index = i start_index = i - window y = df[column].iloc[start_index:end_index].values # Si les données sont insuffisantes (juste par précaution) if len(y) < window: regression_fit.append(np.nan) regression_future_fit.append(np.nan) continue # x centré pour meilleure stabilité numérique x = np.linspace(-1, 1, window) coeffs = np.polyfit(x, y, degree) poly = np.poly1d(coeffs) # Calcul point présent (dernier de la fenêtre) x_now = x[-1] regression_fit.append(poly(x_now)) # Calcul point futur, en ajustant si on dépasse la fin remaining = len(df) - i - 1 effective_offset = min(future_offset, remaining) x_future = x_now + (effective_offset / window) * 2 # respect du même pas regression_future_fit.append(poly(x_future)) df[f"{column}_regression"] = regression_fit # 2. Dérivée première = différence entre deux bougies successives df[f"{column}_regression_deriv1"] = round(100 * df[f"{column}_regression"].diff() / df[f"{column}_regression"], 4) # 3. Dérivée seconde = différence de la dérivée première df[f"{column}_regression_deriv2"] = round( 10 * df[f"{column}_regression_deriv1"].rolling(int(window / 4)).mean().diff(), 4) df[f"{column}_future_{future_offset}"] = regression_future_fit # # 2. Dérivée première = différence entre deux bougies successives # df[f"{column}_future_{future_offset}_deriv1"] = round(100 * df[f"{column}_future_{future_offset}"].diff() / df[f"{column}_future_{future_offset}"], 4) # # # 3. Dérivée seconde = différence de la dérivée première # df[f"{column}_future_{future_offset}_deriv2"] = round(10 * df[f"{column}_future_{future_offset}_deriv1"].rolling(int(window / 4)).mean().diff(), 4) return df def getValuesFromTable(self, values, value): for i in range(len(values) - 1): if values[i] <= value < values[i + 1]: return self.labels[i] return self.labels[-1] # cas limite pour la borne max # def interpolated_val_from_bins(self, row_pos, col_pos): # """ # Renvoie une approximation interpolée (bilinéaire) d'une valeur dans la matrice # à partir de positions flottantes dans l'index (ligne) et les colonnes. # # Parameters: # matrix_df (pd.DataFrame): Matrice des probabilités (index/colonnes = labels). # row_pos (float): Position réelle de la ligne (0 = B5, 10 = H5). # col_pos (float): Position réelle de la colonne (0 = B5, 10 = H5). # # Returns: # float: Valeur interpolée, ou NaN si en dehors des bornes. # """ # # # Labels ordonnés # n = len(self.labels) # # # Vérification des limites # if not (0 <= row_pos <= n - 1) or not (0 <= col_pos <= n - 1): # return np.nan # # # Conversion des labels -> matrice # matrix = self.smooth_smadiff_matrice_df.reindex(index=self.labels, columns=self.labels).values # # # Coordonnées entières (inférieures) # i = int(np.floor(row_pos)) # j = int(np.floor(col_pos)) # # # Coefficients pour interpolation # dx = row_pos - i # dy = col_pos - j # # # Précautions sur les bords # if i >= n - 1: i = n - 2; dx = 1.0 # if j >= n - 1: j = n - 2; dy = 1.0 # # # Récupération des 4 valeurs voisines # v00 = matrix[i][j] # v10 = matrix[i + 1][j] # v01 = matrix[i][j + 1] # v11 = matrix[i + 1][j + 1] # # # Interpolation bilinéaire # interpolated = ( # (1 - dx) * (1 - dy) * v00 + # dx * (1 - dy) * v10 + # (1 - dx) * dy * v01 + # dx * dy * v11 # ) # return interpolated def approx_val_from_bins(self, matrice, numeric_matrice, row_label, col_label): """ Renvoie une approximation de la valeur à partir des labels binaires (e.g. B5, H1) en utilisant une interpolation simple basée sur les indices. Parameters: matrix_df (pd.DataFrame): Matrice avec les labels binaires en index et colonnes. row_label (str): Label de la ligne (ex: 'B3'). col_label (str): Label de la colonne (ex: 'H2'). Returns: float: Valeur approchée si possible, sinon NaN. """ # Vérification des labels if row_label not in matrice.index or col_label not in matrice.columns: return np.nan # Index correspondant row_idx = self.label_to_index.get(row_label) col_idx = self.label_to_index.get(col_label) # Approximation directe (aucune interpolation complexe ici, juste une lecture) return numeric_matrice[row_idx, col_idx] # @property # def protections(self): # return [ # { # "method": "CooldownPeriod", # "stop_duration_candles": 12 # } # # { # # "method": "MaxDrawdown", # # "lookback_period_candles": self.lookback.value, # # "trade_limit": self.trade_limit.value, # # "stop_duration_candles": self.protection_stop.value, # # "max_allowed_drawdown": self.protection_max_allowed_dd.value, # # "only_per_pair": False # # }, # # { # # "method": "StoplossGuard", # # "lookback_period_candles": 24, # # "trade_limit": 4, # # "stop_duration_candles": self.protection_stoploss_stop.value, # # "only_per_pair": False # # }, # # { # # "method": "StoplossGuard", # # "lookback_period_candles": 24, # # "trade_limit": 4, # # "stop_duration_candles": 2, # # "only_per_pair": False # # }, # # { # # "method": "LowProfitPairs", # # "lookback_period_candles": 6, # # "trade_limit": 2, # # "stop_duration_candles": 60, # # "required_profit": 0.02 # # }, # # { # # "method": "LowProfitPairs", # # "lookback_period_candles": 24, # # "trade_limit": 4, # # "stop_duration_candles": 2, # # "required_profit": 0.01 # # } # ] def conditional_smoothing(self, series, threshold=0.002): smoothed = [series.iloc[0]] for val in series.iloc[1:]: last = smoothed[-1] if abs(val - last) / last >= threshold: smoothed.append(val) else: smoothed.append(last) return pd.Series(smoothed, index=series.index) def causal_savgol(self, series, window=25, polyorder=3): result = [] half_window = window # Fenêtre complète dans le passé for i in range(len(series)): if i < half_window: result.append(np.nan) continue window_series = series[i - half_window:i] if window_series.isna().any(): result.append(np.nan) continue coeffs = np.polyfit(range(window), window_series, polyorder) poly = np.poly1d(coeffs) result.append(poly(window - 1)) return pd.Series(result, index=series.index) def get_stake_from_drawdown(self, pct: float, base_stake: float = 100.0, step: float = 0.04, growth: float = 1.15, max_stake: float = 1000.0) -> float: """ Calcule la mise à allouer en fonction du drawdown. :param pct: Drawdown en pourcentage (ex: -0.12 pour -12%) :param base_stake: Mise de base (niveau 0) :param step: Espacement entre paliers (ex: tous les -4%) :param growth: Facteur de croissance par palier (ex: 1.15 pour +15%) :param max_stake: Mise maximale à ne pas dépasser :return: Montant à miser """ if pct >= 0: return base_stake level = int(abs(pct) / step) stake = base_stake * (growth ** level) return min(stake, max_stake) def compute_adaptive_paliers(self, max_drawdown: float = 0.65, first_steps: list[float] = [0.01, 0.01, 0.015, 0.02], growth: float = 1.2) -> list[float]: """ Génère une liste de drawdowns négatifs avec des paliers plus rapprochés au début. :param max_drawdown: Drawdown max (ex: 0.65 pour -65%) :param first_steps: Liste des premiers paliers fixes en % (ex: [0.01, 0.01, 0.015]) :param growth: Facteur multiplicatif pour espacer les paliers suivants :return: Liste de drawdowns négatifs (croissants) """ paliers = [] cumulated = 0.0 # Étapes initiales rapprochées for step in first_steps: cumulated += step paliers.append(round(-cumulated, 4)) # Étapes suivantes plus espacées step = first_steps[-1] while cumulated < max_drawdown: step *= growth cumulated += step if cumulated >= max_drawdown: break paliers.append(round(-cumulated, 4)) return paliers def polynomial_forecast(self, series: pd.Series, window: int = 20, degree: int = 2, steps=[12, 24, 36]): """ Calcule une régression polynomiale sur les `window` dernières valeurs de la série, puis prédit les `n_future` prochaines valeurs. :param series: Série pandas (ex: dataframe['close']) :param window: Nombre de valeurs récentes utilisées pour ajuster le polynôme :param degree: Degré du polynôme (ex: 2 pour quadratique) :param n_future: Nombre de valeurs futures à prédire :return: tuple (poly_function, x_vals, y_pred), où y_pred contient les prédictions futures """ if len(series) < window: raise ValueError("La série est trop courte pour la fenêtre spécifiée.") recent_y = series.iloc[-window:].values x = np.arange(window) coeffs = np.polyfit(x, recent_y, degree) poly = np.poly1d(coeffs) x_future = np.arange(window, window + len(steps)) y_future = poly(x_future) # Affichage de la fonction # print("Fonction polynomiale trouvée :") # print(poly) current = series.iloc[-1] count = 0 for future_step in steps: # range(1, n_future + 1) future_x = window - 1 + future_step prediction = poly(future_x) # series.loc[series.index[future_x], f'poly_pred_t+{future_step}'] = prediction # ➕ Afficher les prédictions # print(f"{current} → t+{future_step}: x={future_x}, y={prediction:.2f}") if prediction > 0: # current: count += 1 return poly, x_future, y_future, count # def calculateStats2(self, df, index, target): # # Nombre de tranches (modifiable) # n_bins_indice = 11 # n_bins_valeur = 11 # # # Tranches dynamiques # # df['indice_tranche'] = pd.qcut(df[f"{index}"], q=n_bins_indice, duplicates='drop') # # df['valeur_tranche'] = pd.qcut(df[f"{target}"], q=n_bins_valeur, duplicates='drop') # # df[f"{index}_bin"], bins_1h = pd.qcut(df[f"{index}"], q=n_bins_indice, labels=self.labels, retbins=True, # duplicates='drop') # df[f"{target}_bin"], bins_1d = pd.qcut(df[f"{target}"], q=n_bins_valeur, labels=self.labels, retbins=True, # duplicates='drop') # # Affichage formaté pour code Python # print(f"Bornes des quantiles pour {index} : [{', '.join([f'{b:.4f}' for b in bins_1h])}]") # print(f"Bornes des quantiles pour {target} : [{', '.join([f'{b:.4f}' for b in bins_1d])}]") # # # Tableau croisé (compte) # tableau = pd.crosstab(df[f"{index}_bin"], df[f"{target}_bin"]) # # # Facultatif : en pourcentages # tableau_pct = tableau.div(tableau.sum(axis=1), axis=0) * 100 # # # Affichage # print("Répartition brute :") # print(tableau) # print("\nRépartition en % par ligne :") # print(tableau_pct.round(2)) def calculateStats(self, df, index, target): # Nombre de tranches (modifiable) n_bins_indice = 11 n_bins_valeur = 11 # Créer les tranches dynamiques df['indice_tranche'] = pd.qcut(df[index], q=n_bins_indice, duplicates='drop') df['valeur_tranche'] = pd.qcut(df[target], q=n_bins_valeur, duplicates='drop') # Créer un tableau croisé avec la moyenne des valeurs pivot_mean = df.pivot_table( index='indice_tranche', columns='valeur_tranche', values=target, # <-- c'est la colonne qu'on agrège aggfunc='mean' # <-- on calcule la moyenne ) # Résultat # print("Moyenne des valeurs par double-tranche :") # print(pivot_mean.round(2)) def should_enter_trade(self, pair: str, last_candle, current_time) -> bool: limit = 3 # return last_candle['slope_norm_1d'] < last_candle['slope_norm_1h'] if self.pairs[pair]['stop'] and last_candle['sma24_inv_bas_1h'] == -1: dispo = round(self.wallets.get_available_stake_amount()) self.pairs[pair]['stop'] = False self.log_trade( last_candle=last_candle, date=current_time, action="🟢RESTART", dispo=dispo, pair=pair, rate=last_candle['close'], trade_type='', profit=0, buys=self.pairs[pair]['count_of_buys'], stake=0 ) return True # 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui s’accélère. # 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel. # 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui s’accélère. # 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom. # if not pair.startswith('BTC'): dispo = round(self.wallets.get_available_stake_amount()) # if self.pairs[pair]['stop'] \ # and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] >= self.indic_deriv1_1d_p_start.value \ # and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] >= self.indic_deriv2_1d_p_start.value: # self.pairs[pair]['stop'] = False # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🟢RESTART", # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=0, # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # else: # if self.pairs[pair]['stop']: # \ # # and last_candle[f"{self.indic_1d_p.value}_deriv1_1h"] <= self.indic_deriv1_1d_p_stop.value \ # # and last_candle[f"{self.indic_1d_p.value}_deriv2_1h"] <= self.indic_deriv2_1d_p_stop.value: # # self.pairs[pair]['stop'] = True # # if self.pairs[pair]['current_profit'] > 0: # # self.pairs[pair]['force_sell'] = True # self.log_trade( # last_candle=last_candle, # date=current_time, # action="🔴STOP " + str(last_candle['sma24_inv_bas_1h']), # dispo=dispo, # pair=pair, # rate=last_candle['close'], # trade_type='', # profit=self.pairs[pair]['current_profit'], # buys=self.pairs[pair]['count_of_buys'], # stake=0 # ) # return False return True # if last_candle['sma5_deriv1_1h'] < -0.02: # return False # # if last_candle['mid_smooth_1h_deriv2'] < -2 or last_candle['sma5_deriv2_1h'] < -2: # return False # # if last_candle['sma5_deriv1_1h'] < 0.0 and last_candle['sma5_deriv2_1h'] < -0.0: # return False # # if last_candle['mid_smooth_1h_deriv1'] < 0.0 and last_candle['mid_smooth_1h_deriv2'] < -0.0 and last_candle[ # 'sma5_deriv2_1h'] < 0: # return False # if pair.startswith('BTC'): # return True # BTC toujours autorisé return True # Filtrer les paires non-BTC non_btc_pairs = [p for p in self.pairs if not p.startswith('BTC')] # Compter les positions actives sur les paires non-BTC max_nb_trades = 0 total_non_btc = 0 max_pair = '' limit_amount = 250 max_amount = 0 for p in non_btc_pairs: max_nb_trades = max(max_nb_trades, self.pairs[p]['count_of_buys']) max_amount = max(max_amount, self.pairs[p]['total_amount']) for p in non_btc_pairs: if (max_nb_trades == self.pairs[p]['count_of_buys'] and max_nb_trades > limit): # if (max_amount == self.pairs[p]['total_amount'] and max_amount > limit_amount): max_pair = p total_non_btc += self.pairs[p]['count_of_buys'] pct_max = self.getPctFirstBuy(pair, last_candle) # self.getPctLastBuy(pair, last_candle) val = self.getProbaHausseSma5d(last_candle) if val < 15: return False # if count_decrease == len(non_btc_pairs): # self.should_enter_trade_count += 1 # char="." # print(f"should_enter_trade canceled all pairs decreased {'':{char}>{self.should_enter_trade_count}}") # return False # if (last_candle['mid_smooth_1h_deriv1'] < -0.0 and last_candle['sma24_deriv1_1h'] < -0.0): # return False # if (last_candle['sma5_deriv1_1d'] < -0.1 and last_candle['sma5_deriv2_1d'] < -0) \ # or last_candle['sma5_deriv2_1d'] < -0.2: # return False if last_candle['mid_smooth_1h_deriv1'] < -0.02: # and last_candle['mid_smooth_1h_deriv2'] > 0): return False # if self.pairs[pair]['count_of_buys'] >= 3: # if (last_candle['sma24_deriv1_1d'] < self.sma24_deriv1_1d_protection.value # and last_candle['sma5_deriv1_1d'] < self.sma5_deriv1_1d_protection.value \ # and last_candle['sma5_deriv2_1d'] < -0.05): # # or (last_candle['sma5_deriv1_1d'] < -0.1 and last_candle['sma24_deriv1_1h'] < -0.1): # self.pairs[pair]['stop'] = True # return False self.should_enter_trade_count = 0 # if max_pair != pair and self.pairs[pair]['total_amount'] > 300: # return False if (max_pair != '') & (self.pairs[pair]['count_of_buys'] >= limit): trade = self.pairs[max_pair]['current_trade'] current_time = current_time.astimezone(timezone.utc) open_date = trade.open_date.astimezone(timezone.utc) current_time_utc = current_time.astimezone(timezone.utc) days_since_open = (current_time_utc - open_date).days pct_max_max = self.getPctFirstBuy(max_pair, last_candle) # print(f"days_since_open {days_since_open} max_pair={max_pair} pair={pair}") return max_pair == pair or pct_max < - 0.25 or ( pct_max_max < - 0.15 and max_pair != pair and days_since_open > 30) else: return True # ---------------------------------------------------------------------------------------------- # fallback defaults (used when no JSON exists) PARAMS_DIR = 'params' DEFAULT_PARAMS = { "rsi_buy": 30, "rsi_sell": 70, "ema_period": 21, "sma_short": 20, "sma_long": 100, "atr_period": 14, "atr_multiplier": 1.5, "stake_amount": None, # use exchange default "stoploss": -0.10, "minimal_roi": {"0": 0.10} } def __init__(self, config: dict) -> None: super().__init__(config) # self.parameters = self.load_params_tree("user_data/strategies/params/") def setTrends(self, dataframe: DataFrame): SMOOTH_WIN=10 df = dataframe.copy() # # --- charger les données --- # df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') # --- calcul SMA14 --- # df['sma'] = talib.SMA(df, timeperiod=20) # ta.trend.sma_indicator(df['close'], 14) # --- pente brute --- df['slope'] = df['sma12'].diff() # --- lissage EMA --- df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean() # df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3) # --- normalisation relative --- df['slope_norm'] = 10000 * df['slope_smooth'] / df['close'] # df['slope_norm'].fillna(0, inplace=True) df['slope_norm'] = df['slope_norm'].fillna(0) # --- classification dynamique via quantiles --- q = df['slope_norm'].quantile([0.125, 0.250, 0.375, 0.5, 0.625, 0.875]).values q1, q2, q3, q4 ,q5, q6 = q def classify_expanding(series): trend_class = [] for i in range(len(series)): past_values = series[:i + 1] # uniquement le passé q = past_values.quantile([0.125, 0.375, 0.625, 0.875]).values q1, q2, q3, q4 = q v = series.iloc[i] if v <= q1: trend_class.append(-3) elif v <= q2: trend_class.append(-2) elif v <= q3: trend_class.append(-1) elif v <= q4: trend_class.append(0) elif v <= q5: trend_class.append(1) elif v <= q6: trend_class.append(2) else: trend_class.append(3) return trend_class dataframe['slope_norm'] = df['slope_norm'] # dataframe['trend_class'] = df['slope_norm'].apply(classify) dataframe['trend_class'] = None # Rolling sur la fenêtre passée dataframe['trend_class'] = classify_expanding(dataframe['slope_norm']) def select_uncorrelated_features(self, df, target, top_n=20, corr_threshold=0.7): """ Sélectionne les features les plus corrélées avec target, tout en supprimant celles trop corrélées entre elles. """ # 1️⃣ Calcul des corrélations absolues avec la cible corr = df.corr(numeric_only=True) corr_target = corr[target].abs().sort_values(ascending=False) # 2️⃣ Prend les N features les plus corrélées avec la cible (hors target) features = corr_target.drop(target).head(top_n).index.tolist() # 3️⃣ Évite les features trop corrélées entre elles selected = [] for feat in features: too_correlated = False for sel in selected: if abs(corr.loc[feat, sel]) > corr_threshold: too_correlated = True break if not too_correlated: selected.append(feat) # 4️⃣ Retourne un DataFrame propre avec les valeurs de corrélation selected_corr = pd.DataFrame({ "feature": selected, "corr_with_target": [corr.loc[f, target] for f in selected] }).sort_values(by="corr_with_target", key=np.abs, ascending=False) return selected_corr