340 lines
14 KiB
Python
340 lines
14 KiB
Python
from datetime import timedelta, datetime
|
|
from freqtrade.persistence import Trade
|
|
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
|
|
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
|
|
import pandas as pd
|
|
import numpy as np
|
|
from pandas import DataFrame
|
|
from typing import Optional, Union, Tuple
|
|
from scipy.signal import find_peaks
|
|
|
|
import logging
|
|
import configparser
|
|
from technical import pivots_points
|
|
# --------------------------------
|
|
#pip install scikit-learn
|
|
|
|
# Add your lib to import here
|
|
import ta
|
|
import talib.abstract as talib
|
|
import freqtrade.vendor.qtpylib.indicators as qtpylib
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
# Configuration du logger
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
)
|
|
|
|
from technical.indicators import RMI
|
|
from ta.momentum import RSIIndicator, StochasticOscillator
|
|
from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator
|
|
from ta.volatility import BollingerBands
|
|
from ta.volume import ChaikinMoneyFlowIndicator
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.linear_model import LogisticRegression
|
|
from sklearn.metrics import accuracy_score
|
|
|
|
|
|
class Detecteur(IStrategy):
|
|
minimal_roi = {
|
|
"0": 10, # Take profit when reaching +5%
|
|
}
|
|
stoploss = -0.05 # Stop loss at -5%
|
|
timeframe = '1h'
|
|
use_custom_stoploss = False
|
|
trailing_stop = False
|
|
model = None
|
|
|
|
plot_config = {
|
|
"main_plot": {
|
|
"close": {"color": "black", "type": "line", "label": "Close Price"},
|
|
"ema50": {"color": "blue", "type": "line", "label": "EMA 50"},
|
|
"ema200": {"color": "red", "type": "line", "label": "EMA 200"},
|
|
"bb_lower": {"color": "green", "type": "line", "label": "Bollinger Lower Band"},
|
|
},
|
|
"subplots": {
|
|
"RSI": {
|
|
"rsi": {"color": "purple", "type": "line", "label": "RSI (14)"},
|
|
},
|
|
"MACD": {
|
|
"macd": {"color": "orange", "type": "line", "label": "MACD"},
|
|
"macd_signal": {"color": "pink", "type": "line", "label": "MACD Signal"},
|
|
},
|
|
"ADX": {
|
|
"adx": {"color": "brown", "type": "line", "label": "ADX"},
|
|
},
|
|
"Stochastic": {
|
|
"stoch_k": {"color": "cyan", "type": "line", "label": "Stoch %K"},
|
|
"stoch_d": {"color": "magenta", "type": "line", "label": "Stoch %D"},
|
|
},
|
|
"Chaikin Money Flow": {
|
|
"cmf": {"color": "teal", "type": "line", "label": "Chaikin Money Flow"},
|
|
},
|
|
"ATR": {
|
|
"atr": {"color": "darkblue", "type": "line", "label": "Average True Range"},
|
|
},
|
|
"Volume": {
|
|
"volume": {"color": "gray", "type": "bar", "label": "Volume"},
|
|
},
|
|
"Heikin Ashi": {
|
|
"ha_close": {"color": "lime", "type": "line", "label": "HA Close"},
|
|
"ha_open": {"color": "gold", "type": "line", "label": "HA Open"},
|
|
},
|
|
}
|
|
}
|
|
|
|
# Optimal timeframe for indicators
|
|
# process_only_new_candles = True
|
|
|
|
features = ['rsi', 'rsi_percent', 'rsi_percent3', 'rsi_percent5', 'bb_upperband', 'bb_lowerband', 'close']
|
|
|
|
# Charger les modèles
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.model = None
|
|
|
|
def train_model(self, dataframe: DataFrame):
|
|
# Sélection des colonnes de caractéristiques (features)
|
|
features = self.features
|
|
target = 'future_direction'
|
|
|
|
# Supprimer les lignes avec des valeurs NaN
|
|
dataframe = dataframe.dropna()
|
|
|
|
# Diviser les données en train (80%) et test (20%)
|
|
X = dataframe[features]
|
|
y = dataframe[target]
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
|
|
|
# Entraîner le modèle de régression logistique
|
|
model = LogisticRegression(max_iter=500)
|
|
model.fit(X_train, y_train)
|
|
|
|
# Évaluer le modèle
|
|
predictions = model.predict(X_test)
|
|
accuracy = accuracy_score(y_test, predictions)
|
|
print(f"Accuracy du modèle : {accuracy * 100:.2f}%")
|
|
|
|
return model
|
|
|
|
def add_future_direction(self, dataframe: DataFrame) -> DataFrame:
|
|
dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close']
|
|
dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
|
|
return dataframe
|
|
|
|
def predict_with_model(self, dataframe: DataFrame, model):
|
|
features = self.features
|
|
|
|
# Supprimer les lignes avec des NaN
|
|
|
|
# Prédire les probabilités
|
|
#dataframe['prediction_prob'] = model.predict_proba(dataframe[features])[:, 1]
|
|
#dataframe['predicted_direction'] = dataframe['prediction_prob'].apply(lambda x: 1 if x > 0.5 else 0)
|
|
|
|
# Assurez-vous que dataframe est une copie complète
|
|
dataframe = dataframe.copy()
|
|
dataframe = dataframe.dropna()
|
|
|
|
# Prédire les probabilités
|
|
dataframe.loc[:, 'prediction_prob'] = model.predict_proba(dataframe[features])[:, 1]
|
|
|
|
# Appliquer la direction prédite
|
|
dataframe.loc[:, 'predicted_direction'] = (dataframe['prediction_prob'] > 0.5).astype(int)
|
|
|
|
return dataframe
|
|
|
|
# Define indicators to avoid recalculating them
|
|
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
|
dataframe['enter_long'] = ""
|
|
dataframe['enter_tag'] = ""
|
|
dataframe['percent'] = (dataframe['close'] - dataframe['open']) / dataframe['open']
|
|
|
|
"""
|
|
Populate indicators used in the strategy.
|
|
"""
|
|
# Moving Averages
|
|
dataframe['ema50'] = EMAIndicator(dataframe['close'], window=50).ema_indicator()
|
|
dataframe['ema200'] = EMAIndicator(dataframe['close'], window=200).ema_indicator()
|
|
dataframe['ma_downtrend'] = (dataframe['close'] < dataframe['ema50']) & (dataframe['ema50'] < dataframe['ema200'])
|
|
|
|
# RSI
|
|
dataframe['rsi'] = RSIIndicator(dataframe['close'], window=14).rsi()
|
|
dataframe['rsi_downtrend'] = dataframe['rsi'] < 50
|
|
|
|
# MACD
|
|
macd = MACD(dataframe['close'], window_slow=26, window_fast=12, window_sign=9)
|
|
dataframe['macd'] = macd.macd()
|
|
dataframe['macd_signal'] = macd.macd_signal()
|
|
dataframe['macd_downtrend'] = (dataframe['macd'] < dataframe['macd_signal']) & (dataframe['macd'] < 0)
|
|
|
|
# ADX
|
|
adx = ADXIndicator(dataframe['high'], dataframe['low'], dataframe['close'], window=14)
|
|
dataframe['adx'] = adx.adx()
|
|
dataframe['adx_downtrend'] = (adx.adx_neg() > adx.adx_pos()) & (dataframe['adx'] > 25)
|
|
|
|
# Bollinger Bands
|
|
bb = BollingerBands(dataframe['close'], window=20, window_dev=2)
|
|
dataframe['bb_lower'] = bb.bollinger_lband()
|
|
dataframe['bb_downtrend'] = dataframe['close'] < dataframe['bb_lower']
|
|
|
|
# Stochastic Oscillator
|
|
stoch = StochasticOscillator(dataframe['high'], dataframe['low'], dataframe['close'], window=14, smooth_window=3)
|
|
dataframe['stoch_k'] = stoch.stoch()
|
|
dataframe['stoch_d'] = stoch.stoch_signal()
|
|
dataframe['stoch_downtrend'] = (dataframe['stoch_k'] < 20) & (dataframe['stoch_d'] < 20)
|
|
|
|
# CMF (Chaikin Money Flow)
|
|
cmf = ChaikinMoneyFlowIndicator(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], window=20)
|
|
dataframe['cmf'] = cmf.chaikin_money_flow()
|
|
dataframe['cmf_downtrend'] = dataframe['cmf'] < -0.2
|
|
|
|
# Heikin Ashi (simplified)
|
|
heikinashi = qtpylib.heikinashi(dataframe)
|
|
dataframe['ha_open'] = heikinashi['open']
|
|
dataframe['ha_close'] = heikinashi['close']
|
|
dataframe['ha_mid'] = (dataframe['ha_close'] + dataframe['ha_open']) / 2
|
|
dataframe['ha_percent'] = (dataframe['ha_close'] - dataframe['ha_open']) / dataframe['ha_open']
|
|
dataframe['ha_percent12'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(12)) / dataframe['ha_open']
|
|
dataframe['ha_percent24'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(24)) / dataframe['ha_open']
|
|
|
|
dataframe['volume2'] = dataframe['volume']
|
|
dataframe.loc[dataframe['ha_percent'] < 0, 'volume2'] *= -1
|
|
|
|
# Volume confirmation
|
|
dataframe['volume_spike'] = abs(dataframe['volume2']) > abs(dataframe['volume2'].rolling(window=20).mean() * 1.5)
|
|
|
|
# dataframe['ha_close'] = (dataframe['open'] + dataframe['high'] + dataframe['low'] + dataframe['close']) / 4
|
|
# dataframe['ha_open'] = dataframe['ha_close'].shift(1).combine_first(dataframe['open'])
|
|
dataframe['ha_red'] = dataframe['ha_close'] < dataframe['ha_open']
|
|
|
|
# ATR (Average True Range)
|
|
dataframe['atr'] = (dataframe['high'] - dataframe['low']).rolling(window=14).mean()
|
|
dataframe['atr_spike'] = dataframe['atr'] > dataframe['atr'].rolling(window=20).mean()
|
|
|
|
dataframe['min30'] = talib.MIN(dataframe['close'], timeperiod=30)
|
|
dataframe['max30'] = talib.MAX(dataframe['close'], timeperiod=30)
|
|
|
|
# Calcul des indicateurs
|
|
dataframe['rsi'] = talib.RSI(dataframe, timeperiod=14)
|
|
#dataframe['macd'], dataframe['macd_signal'], _ = talib.MACD(dataframe)
|
|
|
|
# Bollinger Bands
|
|
bollinger = talib.BBANDS(dataframe, timeperiod=20)
|
|
dataframe['bb_upperband'] = bollinger['upperband']
|
|
dataframe['bb_middleband'] = bollinger['middleband']
|
|
dataframe['bb_lowerband'] = bollinger['lowerband']
|
|
|
|
# Calcul des changements futurs
|
|
dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close']
|
|
dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
|
|
|
|
# Segmenter RSI
|
|
dataframe['rsi_range'] = pd.cut(dataframe['rsi'], bins=[0, 30, 70, 100], labels=['oversold', 'neutral', 'overbought'])
|
|
dataframe['rsi_percent'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(1)) / dataframe['rsi']
|
|
dataframe['rsi_percent3'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(3)) / dataframe['rsi']
|
|
dataframe['rsi_percent5'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(5)) / dataframe['rsi']
|
|
dataframe['rsi_pct_range'] = pd.cut(dataframe['rsi_percent3'], bins=[-80, -15, -10, -5, 0, 5, 10, 15, 80], labels=['-20', '-15', '-10', '-5', '0', '5', '10', '15'])
|
|
|
|
#dataframe = self.calculate_negative(dataframe)
|
|
# Probabilités par RSI
|
|
#for group in self.features:
|
|
# rsi_probabilities = dataframe.groupby(group)['future_direction'].mean()
|
|
# print(f"Probabilités de hausse selon {group} :\n", rsi_probabilities)
|
|
|
|
rsi_probabilities = dataframe.groupby('rsi_pct_range')['future_direction'].mean()
|
|
print(f"Probabilités de hausse selon rsi_pct_range :\n", rsi_probabilities)
|
|
|
|
# Probabilités par MACD (positif/négatif)
|
|
#dataframe['macd_signal'] = (dataframe['macd'] > dataframe['macd_signal']).astype(int)
|
|
#macd_probabilities = dataframe.groupby('macd_signal')['future_direction'].mean()
|
|
#print("Probabilités de hausse selon MACD :\n", macd_probabilities)
|
|
|
|
return dataframe
|
|
|
|
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
|
"""
|
|
Define buy signal logic.
|
|
"""
|
|
dataframe.loc[
|
|
(
|
|
# Combine indicators to detect a strong downtrend
|
|
(dataframe['ha_red'].shift(1) == 1)
|
|
& (dataframe['ha_red'] == 0)
|
|
# dataframe['ma_downtrend'] &
|
|
# dataframe['rsi_downtrend'] &
|
|
# dataframe['macd_downtrend'] &
|
|
# dataframe['adx_downtrend'] &
|
|
# dataframe['bb_downtrend'] &
|
|
# dataframe['volume_spike'] &
|
|
# dataframe['stoch_downtrend'] &
|
|
# dataframe['cmf_downtrend'] &
|
|
# dataframe['ha_red'] &
|
|
# dataframe['atr_spike']
|
|
),
|
|
'enter_long'] = 1
|
|
nan_columns = dataframe.columns[dataframe.isna().any()].tolist()
|
|
print("Colonnes contenant des NaN :", nan_columns)
|
|
|
|
# Compter les colonnes avec uniquement des NaN
|
|
num_only_nan_columns = dataframe.isna().all().sum()
|
|
print(f"Nombre de colonnes contenant uniquement des NaN : {num_only_nan_columns}")
|
|
|
|
# Compter les NaN par colonne
|
|
nan_count_per_column = dataframe.isna().sum()
|
|
print("Nombre de NaN par colonne :")
|
|
print(nan_count_per_column)
|
|
columns_with_nan = nan_count_per_column[nan_count_per_column > 1000]
|
|
print("Colonnes avec au moins 1000 NaN :")
|
|
print(columns_with_nan)
|
|
|
|
|
|
# Former le modèle si ce n'est pas déjà fait
|
|
if self.model is None:
|
|
dataframe = self.add_future_direction(dataframe)
|
|
self.model = self.train_model(dataframe)
|
|
|
|
# Faire des prédictions
|
|
dataframe = self.predict_with_model(dataframe, self.model)
|
|
|
|
# Entrée sur un signal de hausse
|
|
#dataframe.loc[
|
|
# (dataframe['predicted_direction'] == 1),
|
|
# 'enter_long'] = 1
|
|
|
|
return dataframe
|
|
|
|
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
|
"""
|
|
Define sell signal logic.
|
|
"""
|
|
# dataframe.loc[
|
|
# (
|
|
# # Exit condition: close price crosses above EMA50
|
|
# dataframe['close'] > dataframe['ema50']
|
|
# ),
|
|
# 'sell'] = 1
|
|
|
|
return dataframe
|
|
|
|
def calculate_negative(self, df: DataFrame) -> DataFrame:
|
|
negative_counts = []
|
|
|
|
# Boucle optimisée pour calculer les positions fermées en perte avant la position actuelle
|
|
for i in range(len(df)):
|
|
# Subset jusqu'à la ligne actuelle
|
|
previous_trades = df['percent'].iloc[:i]
|
|
|
|
# Trouver toutes les positions négatives avant le premier profit positif
|
|
if not previous_trades.empty:
|
|
mask = (previous_trades > 0).idxmax() # Trouver l'index du premier profit positif
|
|
count = (previous_trades.iloc[:mask] < 0).sum() if mask > 0 else len(previous_trades)
|
|
else:
|
|
count = 0 # Si aucune position précédente, pas de perte
|
|
|
|
negative_counts.append(count)
|
|
|
|
df['negative_positions_before'] = negative_counts
|
|
print(df)
|