from datetime import timedelta, datetime from freqtrade.persistence import Trade from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open, IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute) import pandas as pd import numpy as np from pandas import DataFrame from typing import Optional, Union, Tuple from scipy.signal import find_peaks import logging import configparser from technical import pivots_points # -------------------------------- #pip install scikit-learn # Add your lib to import here import ta import talib.abstract as talib import freqtrade.vendor.qtpylib.indicators as qtpylib import requests logger = logging.getLogger(__name__) # Configuration du logger logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) from technical.indicators import RMI from ta.momentum import RSIIndicator, StochasticOscillator from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator from ta.volatility import BollingerBands from ta.volume import ChaikinMoneyFlowIndicator from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score class Detecteur(IStrategy): minimal_roi = { "0": 10, # Take profit when reaching +5% } stoploss = -0.05 # Stop loss at -5% timeframe = '1h' use_custom_stoploss = False trailing_stop = False model = None plot_config = { "main_plot": { "close": {"color": "black", "type": "line", "label": "Close Price"}, "ema50": {"color": "blue", "type": "line", "label": "EMA 50"}, "ema200": {"color": "red", "type": "line", "label": "EMA 200"}, "bb_lower": {"color": "green", "type": "line", "label": "Bollinger Lower Band"}, }, "subplots": { "RSI": { "rsi": {"color": "purple", "type": "line", "label": "RSI (14)"}, }, "MACD": { "macd": {"color": "orange", "type": "line", "label": "MACD"}, "macd_signal": {"color": "pink", "type": "line", "label": "MACD Signal"}, }, "ADX": { "adx": {"color": "brown", "type": "line", "label": "ADX"}, }, "Stochastic": { "stoch_k": {"color": "cyan", "type": "line", "label": "Stoch %K"}, "stoch_d": {"color": "magenta", "type": "line", "label": "Stoch %D"}, }, "Chaikin Money Flow": { "cmf": {"color": "teal", "type": "line", "label": "Chaikin Money Flow"}, }, "ATR": { "atr": {"color": "darkblue", "type": "line", "label": "Average True Range"}, }, "Volume": { "volume": {"color": "gray", "type": "bar", "label": "Volume"}, }, "Heikin Ashi": { "ha_close": {"color": "lime", "type": "line", "label": "HA Close"}, "ha_open": {"color": "gold", "type": "line", "label": "HA Open"}, }, } } # Optimal timeframe for indicators # process_only_new_candles = True features = ['rsi', 'rsi_percent', 'rsi_percent3', 'rsi_percent5', 'bb_upperband', 'bb_lowerband', 'close'] # Charger les modèles def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.model = None def train_model(self, dataframe: DataFrame): # Sélection des colonnes de caractéristiques (features) features = self.features target = 'future_direction' # Supprimer les lignes avec des valeurs NaN dataframe = dataframe.dropna() # Diviser les données en train (80%) et test (20%) X = dataframe[features] y = dataframe[target] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Entraîner le modèle de régression logistique model = LogisticRegression(max_iter=500) model.fit(X_train, y_train) # Évaluer le modèle predictions = model.predict(X_test) accuracy = accuracy_score(y_test, predictions) print(f"Accuracy du modèle : {accuracy * 100:.2f}%") return model def add_future_direction(self, dataframe: DataFrame) -> DataFrame: dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close'] dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0) return dataframe def predict_with_model(self, dataframe: DataFrame, model): features = self.features # Supprimer les lignes avec des NaN # Prédire les probabilités #dataframe['prediction_prob'] = model.predict_proba(dataframe[features])[:, 1] #dataframe['predicted_direction'] = dataframe['prediction_prob'].apply(lambda x: 1 if x > 0.5 else 0) # Assurez-vous que dataframe est une copie complète dataframe = dataframe.copy() dataframe = dataframe.dropna() # Prédire les probabilités dataframe.loc[:, 'prediction_prob'] = model.predict_proba(dataframe[features])[:, 1] # Appliquer la direction prédite dataframe.loc[:, 'predicted_direction'] = (dataframe['prediction_prob'] > 0.5).astype(int) return dataframe # Define indicators to avoid recalculating them def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: dataframe['enter_long'] = "" dataframe['enter_tag'] = "" dataframe['percent'] = (dataframe['close'] - dataframe['open']) / dataframe['open'] """ Populate indicators used in the strategy. """ # Moving Averages dataframe['ema50'] = EMAIndicator(dataframe['close'], window=50).ema_indicator() dataframe['ema200'] = EMAIndicator(dataframe['close'], window=200).ema_indicator() dataframe['ma_downtrend'] = (dataframe['close'] < dataframe['ema50']) & (dataframe['ema50'] < dataframe['ema200']) # RSI dataframe['rsi'] = RSIIndicator(dataframe['close'], window=14).rsi() dataframe['rsi_downtrend'] = dataframe['rsi'] < 50 # MACD macd = MACD(dataframe['close'], window_slow=26, window_fast=12, window_sign=9) dataframe['macd'] = macd.macd() dataframe['macd_signal'] = macd.macd_signal() dataframe['macd_downtrend'] = (dataframe['macd'] < dataframe['macd_signal']) & (dataframe['macd'] < 0) # ADX adx = ADXIndicator(dataframe['high'], dataframe['low'], dataframe['close'], window=14) dataframe['adx'] = adx.adx() dataframe['adx_downtrend'] = (adx.adx_neg() > adx.adx_pos()) & (dataframe['adx'] > 25) # Bollinger Bands bb = BollingerBands(dataframe['close'], window=20, window_dev=2) dataframe['bb_lower'] = bb.bollinger_lband() dataframe['bb_downtrend'] = dataframe['close'] < dataframe['bb_lower'] # Stochastic Oscillator stoch = StochasticOscillator(dataframe['high'], dataframe['low'], dataframe['close'], window=14, smooth_window=3) dataframe['stoch_k'] = stoch.stoch() dataframe['stoch_d'] = stoch.stoch_signal() dataframe['stoch_downtrend'] = (dataframe['stoch_k'] < 20) & (dataframe['stoch_d'] < 20) # CMF (Chaikin Money Flow) cmf = ChaikinMoneyFlowIndicator(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], window=20) dataframe['cmf'] = cmf.chaikin_money_flow() dataframe['cmf_downtrend'] = dataframe['cmf'] < -0.2 # Heikin Ashi (simplified) heikinashi = qtpylib.heikinashi(dataframe) dataframe['ha_open'] = heikinashi['open'] dataframe['ha_close'] = heikinashi['close'] dataframe['ha_mid'] = (dataframe['ha_close'] + dataframe['ha_open']) / 2 dataframe['ha_percent'] = (dataframe['ha_close'] - dataframe['ha_open']) / dataframe['ha_open'] dataframe['ha_percent12'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(12)) / dataframe['ha_open'] dataframe['ha_percent24'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(24)) / dataframe['ha_open'] dataframe['volume2'] = dataframe['volume'] dataframe.loc[dataframe['ha_percent'] < 0, 'volume2'] *= -1 # Volume confirmation dataframe['volume_spike'] = abs(dataframe['volume2']) > abs(dataframe['volume2'].rolling(window=20).mean() * 1.5) # dataframe['ha_close'] = (dataframe['open'] + dataframe['high'] + dataframe['low'] + dataframe['close']) / 4 # dataframe['ha_open'] = dataframe['ha_close'].shift(1).combine_first(dataframe['open']) dataframe['ha_red'] = dataframe['ha_close'] < dataframe['ha_open'] # ATR (Average True Range) dataframe['atr'] = (dataframe['high'] - dataframe['low']).rolling(window=14).mean() dataframe['atr_spike'] = dataframe['atr'] > dataframe['atr'].rolling(window=20).mean() dataframe['min30'] = talib.MIN(dataframe['close'], timeperiod=30) dataframe['max30'] = talib.MAX(dataframe['close'], timeperiod=30) # Calcul des indicateurs dataframe['rsi'] = talib.RSI(dataframe, timeperiod=14) #dataframe['macd'], dataframe['macd_signal'], _ = talib.MACD(dataframe) # Bollinger Bands bollinger = talib.BBANDS(dataframe, timeperiod=20) dataframe['bb_upperband'] = bollinger['upperband'] dataframe['bb_middleband'] = bollinger['middleband'] dataframe['bb_lowerband'] = bollinger['lowerband'] # Calcul des changements futurs dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close'] dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0) # Segmenter RSI dataframe['rsi_range'] = pd.cut(dataframe['rsi'], bins=[0, 30, 70, 100], labels=['oversold', 'neutral', 'overbought']) dataframe['rsi_percent'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(1)) / dataframe['rsi'] dataframe['rsi_percent3'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(3)) / dataframe['rsi'] dataframe['rsi_percent5'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(5)) / dataframe['rsi'] dataframe['rsi_pct_range'] = pd.cut(dataframe['rsi_percent3'], bins=[-80, -15, -10, -5, 0, 5, 10, 15, 80], labels=['-20', '-15', '-10', '-5', '0', '5', '10', '15']) #dataframe = self.calculate_negative(dataframe) # Probabilités par RSI #for group in self.features: # rsi_probabilities = dataframe.groupby(group)['future_direction'].mean() # print(f"Probabilités de hausse selon {group} :\n", rsi_probabilities) rsi_probabilities = dataframe.groupby('rsi_pct_range')['future_direction'].mean() print(f"Probabilités de hausse selon rsi_pct_range :\n", rsi_probabilities) # Probabilités par MACD (positif/négatif) #dataframe['macd_signal'] = (dataframe['macd'] > dataframe['macd_signal']).astype(int) #macd_probabilities = dataframe.groupby('macd_signal')['future_direction'].mean() #print("Probabilités de hausse selon MACD :\n", macd_probabilities) return dataframe def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ Define buy signal logic. """ dataframe.loc[ ( # Combine indicators to detect a strong downtrend (dataframe['ha_red'].shift(1) == 1) & (dataframe['ha_red'] == 0) # dataframe['ma_downtrend'] & # dataframe['rsi_downtrend'] & # dataframe['macd_downtrend'] & # dataframe['adx_downtrend'] & # dataframe['bb_downtrend'] & # dataframe['volume_spike'] & # dataframe['stoch_downtrend'] & # dataframe['cmf_downtrend'] & # dataframe['ha_red'] & # dataframe['atr_spike'] ), 'enter_long'] = 1 nan_columns = dataframe.columns[dataframe.isna().any()].tolist() print("Colonnes contenant des NaN :", nan_columns) # Compter les colonnes avec uniquement des NaN num_only_nan_columns = dataframe.isna().all().sum() print(f"Nombre de colonnes contenant uniquement des NaN : {num_only_nan_columns}") # Compter les NaN par colonne nan_count_per_column = dataframe.isna().sum() print("Nombre de NaN par colonne :") print(nan_count_per_column) columns_with_nan = nan_count_per_column[nan_count_per_column > 1000] print("Colonnes avec au moins 1000 NaN :") print(columns_with_nan) # Former le modèle si ce n'est pas déjà fait if self.model is None: dataframe = self.add_future_direction(dataframe) self.model = self.train_model(dataframe) # Faire des prédictions dataframe = self.predict_with_model(dataframe, self.model) # Entrée sur un signal de hausse #dataframe.loc[ # (dataframe['predicted_direction'] == 1), # 'enter_long'] = 1 return dataframe def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ Define sell signal logic. """ # dataframe.loc[ # ( # # Exit condition: close price crosses above EMA50 # dataframe['close'] > dataframe['ema50'] # ), # 'sell'] = 1 return dataframe def calculate_negative(self, df: DataFrame) -> DataFrame: negative_counts = [] # Boucle optimisée pour calculer les positions fermées en perte avant la position actuelle for i in range(len(df)): # Subset jusqu'à la ligne actuelle previous_trades = df['percent'].iloc[:i] # Trouver toutes les positions négatives avant le premier profit positif if not previous_trades.empty: mask = (previous_trades > 0).idxmax() # Trouver l'index du premier profit positif count = (previous_trades.iloc[:mask] < 0).sum() if mask > 0 else len(previous_trades) else: count = 0 # Si aucune position précédente, pas de perte negative_counts.append(count) df['negative_positions_before'] = negative_counts print(df)