Files
Freqtrade/Detecteur.py
Jérôme Delacotte 0397b06a97 Add new strategies
2025-03-06 11:44:55 +01:00

340 lines
14 KiB
Python

from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
from pandas import DataFrame
from typing import Optional, Union, Tuple
from scipy.signal import find_peaks
import logging
import configparser
from technical import pivots_points
# --------------------------------
#pip install scikit-learn
# Add your lib to import here
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
import requests
logger = logging.getLogger(__name__)
# Configuration du logger
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
from technical.indicators import RMI
from ta.momentum import RSIIndicator, StochasticOscillator
from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator
from ta.volatility import BollingerBands
from ta.volume import ChaikinMoneyFlowIndicator
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
class Detecteur(IStrategy):
minimal_roi = {
"0": 10, # Take profit when reaching +5%
}
stoploss = -0.05 # Stop loss at -5%
timeframe = '1h'
use_custom_stoploss = False
trailing_stop = False
model = None
plot_config = {
"main_plot": {
"close": {"color": "black", "type": "line", "label": "Close Price"},
"ema50": {"color": "blue", "type": "line", "label": "EMA 50"},
"ema200": {"color": "red", "type": "line", "label": "EMA 200"},
"bb_lower": {"color": "green", "type": "line", "label": "Bollinger Lower Band"},
},
"subplots": {
"RSI": {
"rsi": {"color": "purple", "type": "line", "label": "RSI (14)"},
},
"MACD": {
"macd": {"color": "orange", "type": "line", "label": "MACD"},
"macd_signal": {"color": "pink", "type": "line", "label": "MACD Signal"},
},
"ADX": {
"adx": {"color": "brown", "type": "line", "label": "ADX"},
},
"Stochastic": {
"stoch_k": {"color": "cyan", "type": "line", "label": "Stoch %K"},
"stoch_d": {"color": "magenta", "type": "line", "label": "Stoch %D"},
},
"Chaikin Money Flow": {
"cmf": {"color": "teal", "type": "line", "label": "Chaikin Money Flow"},
},
"ATR": {
"atr": {"color": "darkblue", "type": "line", "label": "Average True Range"},
},
"Volume": {
"volume": {"color": "gray", "type": "bar", "label": "Volume"},
},
"Heikin Ashi": {
"ha_close": {"color": "lime", "type": "line", "label": "HA Close"},
"ha_open": {"color": "gold", "type": "line", "label": "HA Open"},
},
}
}
# Optimal timeframe for indicators
# process_only_new_candles = True
features = ['rsi', 'rsi_percent', 'rsi_percent3', 'rsi_percent5', 'bb_upperband', 'bb_lowerband', 'close']
# Charger les modèles
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.model = None
def train_model(self, dataframe: DataFrame):
# Sélection des colonnes de caractéristiques (features)
features = self.features
target = 'future_direction'
# Supprimer les lignes avec des valeurs NaN
dataframe = dataframe.dropna()
# Diviser les données en train (80%) et test (20%)
X = dataframe[features]
y = dataframe[target]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Entraîner le modèle de régression logistique
model = LogisticRegression(max_iter=500)
model.fit(X_train, y_train)
# Évaluer le modèle
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy du modèle : {accuracy * 100:.2f}%")
return model
def add_future_direction(self, dataframe: DataFrame) -> DataFrame:
dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close']
dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
return dataframe
def predict_with_model(self, dataframe: DataFrame, model):
features = self.features
# Supprimer les lignes avec des NaN
# Prédire les probabilités
#dataframe['prediction_prob'] = model.predict_proba(dataframe[features])[:, 1]
#dataframe['predicted_direction'] = dataframe['prediction_prob'].apply(lambda x: 1 if x > 0.5 else 0)
# Assurez-vous que dataframe est une copie complète
dataframe = dataframe.copy()
dataframe = dataframe.dropna()
# Prédire les probabilités
dataframe.loc[:, 'prediction_prob'] = model.predict_proba(dataframe[features])[:, 1]
# Appliquer la direction prédite
dataframe.loc[:, 'predicted_direction'] = (dataframe['prediction_prob'] > 0.5).astype(int)
return dataframe
# Define indicators to avoid recalculating them
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe['enter_long'] = ""
dataframe['enter_tag'] = ""
dataframe['percent'] = (dataframe['close'] - dataframe['open']) / dataframe['open']
"""
Populate indicators used in the strategy.
"""
# Moving Averages
dataframe['ema50'] = EMAIndicator(dataframe['close'], window=50).ema_indicator()
dataframe['ema200'] = EMAIndicator(dataframe['close'], window=200).ema_indicator()
dataframe['ma_downtrend'] = (dataframe['close'] < dataframe['ema50']) & (dataframe['ema50'] < dataframe['ema200'])
# RSI
dataframe['rsi'] = RSIIndicator(dataframe['close'], window=14).rsi()
dataframe['rsi_downtrend'] = dataframe['rsi'] < 50
# MACD
macd = MACD(dataframe['close'], window_slow=26, window_fast=12, window_sign=9)
dataframe['macd'] = macd.macd()
dataframe['macd_signal'] = macd.macd_signal()
dataframe['macd_downtrend'] = (dataframe['macd'] < dataframe['macd_signal']) & (dataframe['macd'] < 0)
# ADX
adx = ADXIndicator(dataframe['high'], dataframe['low'], dataframe['close'], window=14)
dataframe['adx'] = adx.adx()
dataframe['adx_downtrend'] = (adx.adx_neg() > adx.adx_pos()) & (dataframe['adx'] > 25)
# Bollinger Bands
bb = BollingerBands(dataframe['close'], window=20, window_dev=2)
dataframe['bb_lower'] = bb.bollinger_lband()
dataframe['bb_downtrend'] = dataframe['close'] < dataframe['bb_lower']
# Stochastic Oscillator
stoch = StochasticOscillator(dataframe['high'], dataframe['low'], dataframe['close'], window=14, smooth_window=3)
dataframe['stoch_k'] = stoch.stoch()
dataframe['stoch_d'] = stoch.stoch_signal()
dataframe['stoch_downtrend'] = (dataframe['stoch_k'] < 20) & (dataframe['stoch_d'] < 20)
# CMF (Chaikin Money Flow)
cmf = ChaikinMoneyFlowIndicator(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], window=20)
dataframe['cmf'] = cmf.chaikin_money_flow()
dataframe['cmf_downtrend'] = dataframe['cmf'] < -0.2
# Heikin Ashi (simplified)
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['ha_open'] = heikinashi['open']
dataframe['ha_close'] = heikinashi['close']
dataframe['ha_mid'] = (dataframe['ha_close'] + dataframe['ha_open']) / 2
dataframe['ha_percent'] = (dataframe['ha_close'] - dataframe['ha_open']) / dataframe['ha_open']
dataframe['ha_percent12'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(12)) / dataframe['ha_open']
dataframe['ha_percent24'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(24)) / dataframe['ha_open']
dataframe['volume2'] = dataframe['volume']
dataframe.loc[dataframe['ha_percent'] < 0, 'volume2'] *= -1
# Volume confirmation
dataframe['volume_spike'] = abs(dataframe['volume2']) > abs(dataframe['volume2'].rolling(window=20).mean() * 1.5)
# dataframe['ha_close'] = (dataframe['open'] + dataframe['high'] + dataframe['low'] + dataframe['close']) / 4
# dataframe['ha_open'] = dataframe['ha_close'].shift(1).combine_first(dataframe['open'])
dataframe['ha_red'] = dataframe['ha_close'] < dataframe['ha_open']
# ATR (Average True Range)
dataframe['atr'] = (dataframe['high'] - dataframe['low']).rolling(window=14).mean()
dataframe['atr_spike'] = dataframe['atr'] > dataframe['atr'].rolling(window=20).mean()
dataframe['min30'] = talib.MIN(dataframe['close'], timeperiod=30)
dataframe['max30'] = talib.MAX(dataframe['close'], timeperiod=30)
# Calcul des indicateurs
dataframe['rsi'] = talib.RSI(dataframe, timeperiod=14)
#dataframe['macd'], dataframe['macd_signal'], _ = talib.MACD(dataframe)
# Bollinger Bands
bollinger = talib.BBANDS(dataframe, timeperiod=20)
dataframe['bb_upperband'] = bollinger['upperband']
dataframe['bb_middleband'] = bollinger['middleband']
dataframe['bb_lowerband'] = bollinger['lowerband']
# Calcul des changements futurs
dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close']
dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
# Segmenter RSI
dataframe['rsi_range'] = pd.cut(dataframe['rsi'], bins=[0, 30, 70, 100], labels=['oversold', 'neutral', 'overbought'])
dataframe['rsi_percent'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(1)) / dataframe['rsi']
dataframe['rsi_percent3'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(3)) / dataframe['rsi']
dataframe['rsi_percent5'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(5)) / dataframe['rsi']
dataframe['rsi_pct_range'] = pd.cut(dataframe['rsi_percent3'], bins=[-80, -15, -10, -5, 0, 5, 10, 15, 80], labels=['-20', '-15', '-10', '-5', '0', '5', '10', '15'])
#dataframe = self.calculate_negative(dataframe)
# Probabilités par RSI
#for group in self.features:
# rsi_probabilities = dataframe.groupby(group)['future_direction'].mean()
# print(f"Probabilités de hausse selon {group} :\n", rsi_probabilities)
rsi_probabilities = dataframe.groupby('rsi_pct_range')['future_direction'].mean()
print(f"Probabilités de hausse selon rsi_pct_range :\n", rsi_probabilities)
# Probabilités par MACD (positif/négatif)
#dataframe['macd_signal'] = (dataframe['macd'] > dataframe['macd_signal']).astype(int)
#macd_probabilities = dataframe.groupby('macd_signal')['future_direction'].mean()
#print("Probabilités de hausse selon MACD :\n", macd_probabilities)
return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Define buy signal logic.
"""
dataframe.loc[
(
# Combine indicators to detect a strong downtrend
(dataframe['ha_red'].shift(1) == 1)
& (dataframe['ha_red'] == 0)
# dataframe['ma_downtrend'] &
# dataframe['rsi_downtrend'] &
# dataframe['macd_downtrend'] &
# dataframe['adx_downtrend'] &
# dataframe['bb_downtrend'] &
# dataframe['volume_spike'] &
# dataframe['stoch_downtrend'] &
# dataframe['cmf_downtrend'] &
# dataframe['ha_red'] &
# dataframe['atr_spike']
),
'enter_long'] = 1
nan_columns = dataframe.columns[dataframe.isna().any()].tolist()
print("Colonnes contenant des NaN :", nan_columns)
# Compter les colonnes avec uniquement des NaN
num_only_nan_columns = dataframe.isna().all().sum()
print(f"Nombre de colonnes contenant uniquement des NaN : {num_only_nan_columns}")
# Compter les NaN par colonne
nan_count_per_column = dataframe.isna().sum()
print("Nombre de NaN par colonne :")
print(nan_count_per_column)
columns_with_nan = nan_count_per_column[nan_count_per_column > 1000]
print("Colonnes avec au moins 1000 NaN :")
print(columns_with_nan)
# Former le modèle si ce n'est pas déjà fait
if self.model is None:
dataframe = self.add_future_direction(dataframe)
self.model = self.train_model(dataframe)
# Faire des prédictions
dataframe = self.predict_with_model(dataframe, self.model)
# Entrée sur un signal de hausse
#dataframe.loc[
# (dataframe['predicted_direction'] == 1),
# 'enter_long'] = 1
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Define sell signal logic.
"""
# dataframe.loc[
# (
# # Exit condition: close price crosses above EMA50
# dataframe['close'] > dataframe['ema50']
# ),
# 'sell'] = 1
return dataframe
def calculate_negative(self, df: DataFrame) -> DataFrame:
negative_counts = []
# Boucle optimisée pour calculer les positions fermées en perte avant la position actuelle
for i in range(len(df)):
# Subset jusqu'à la ligne actuelle
previous_trades = df['percent'].iloc[:i]
# Trouver toutes les positions négatives avant le premier profit positif
if not previous_trades.empty:
mask = (previous_trades > 0).idxmax() # Trouver l'index du premier profit positif
count = (previous_trades.iloc[:mask] < 0).sum() if mask > 0 else len(previous_trades)
else:
count = 0 # Si aucune position précédente, pas de perte
negative_counts.append(count)
df['negative_positions_before'] = negative_counts
print(df)