Add new strategies
This commit is contained in:
339
Detecteur.py
Normal file
339
Detecteur.py
Normal file
@@ -0,0 +1,339 @@
|
||||
from datetime import timedelta, datetime
|
||||
from freqtrade.persistence import Trade
|
||||
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
|
||||
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from pandas import DataFrame
|
||||
from typing import Optional, Union, Tuple
|
||||
from scipy.signal import find_peaks
|
||||
|
||||
import logging
|
||||
import configparser
|
||||
from technical import pivots_points
|
||||
# --------------------------------
|
||||
#pip install scikit-learn
|
||||
|
||||
# Add your lib to import here
|
||||
import ta
|
||||
import talib.abstract as talib
|
||||
import freqtrade.vendor.qtpylib.indicators as qtpylib
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
# Configuration du logger
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
)
|
||||
|
||||
from technical.indicators import RMI
|
||||
from ta.momentum import RSIIndicator, StochasticOscillator
|
||||
from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator
|
||||
from ta.volatility import BollingerBands
|
||||
from ta.volume import ChaikinMoneyFlowIndicator
|
||||
from sklearn.model_selection import train_test_split
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
from sklearn.metrics import accuracy_score
|
||||
|
||||
|
||||
class Detecteur(IStrategy):
|
||||
minimal_roi = {
|
||||
"0": 10, # Take profit when reaching +5%
|
||||
}
|
||||
stoploss = -0.05 # Stop loss at -5%
|
||||
timeframe = '1h'
|
||||
use_custom_stoploss = False
|
||||
trailing_stop = False
|
||||
model = None
|
||||
|
||||
plot_config = {
|
||||
"main_plot": {
|
||||
"close": {"color": "black", "type": "line", "label": "Close Price"},
|
||||
"ema50": {"color": "blue", "type": "line", "label": "EMA 50"},
|
||||
"ema200": {"color": "red", "type": "line", "label": "EMA 200"},
|
||||
"bb_lower": {"color": "green", "type": "line", "label": "Bollinger Lower Band"},
|
||||
},
|
||||
"subplots": {
|
||||
"RSI": {
|
||||
"rsi": {"color": "purple", "type": "line", "label": "RSI (14)"},
|
||||
},
|
||||
"MACD": {
|
||||
"macd": {"color": "orange", "type": "line", "label": "MACD"},
|
||||
"macd_signal": {"color": "pink", "type": "line", "label": "MACD Signal"},
|
||||
},
|
||||
"ADX": {
|
||||
"adx": {"color": "brown", "type": "line", "label": "ADX"},
|
||||
},
|
||||
"Stochastic": {
|
||||
"stoch_k": {"color": "cyan", "type": "line", "label": "Stoch %K"},
|
||||
"stoch_d": {"color": "magenta", "type": "line", "label": "Stoch %D"},
|
||||
},
|
||||
"Chaikin Money Flow": {
|
||||
"cmf": {"color": "teal", "type": "line", "label": "Chaikin Money Flow"},
|
||||
},
|
||||
"ATR": {
|
||||
"atr": {"color": "darkblue", "type": "line", "label": "Average True Range"},
|
||||
},
|
||||
"Volume": {
|
||||
"volume": {"color": "gray", "type": "bar", "label": "Volume"},
|
||||
},
|
||||
"Heikin Ashi": {
|
||||
"ha_close": {"color": "lime", "type": "line", "label": "HA Close"},
|
||||
"ha_open": {"color": "gold", "type": "line", "label": "HA Open"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
# Optimal timeframe for indicators
|
||||
# process_only_new_candles = True
|
||||
|
||||
features = ['rsi', 'rsi_percent', 'rsi_percent3', 'rsi_percent5', 'bb_upperband', 'bb_lowerband', 'close']
|
||||
|
||||
# Charger les modèles
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.model = None
|
||||
|
||||
def train_model(self, dataframe: DataFrame):
|
||||
# Sélection des colonnes de caractéristiques (features)
|
||||
features = self.features
|
||||
target = 'future_direction'
|
||||
|
||||
# Supprimer les lignes avec des valeurs NaN
|
||||
dataframe = dataframe.dropna()
|
||||
|
||||
# Diviser les données en train (80%) et test (20%)
|
||||
X = dataframe[features]
|
||||
y = dataframe[target]
|
||||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
|
||||
|
||||
# Entraîner le modèle de régression logistique
|
||||
model = LogisticRegression(max_iter=500)
|
||||
model.fit(X_train, y_train)
|
||||
|
||||
# Évaluer le modèle
|
||||
predictions = model.predict(X_test)
|
||||
accuracy = accuracy_score(y_test, predictions)
|
||||
print(f"Accuracy du modèle : {accuracy * 100:.2f}%")
|
||||
|
||||
return model
|
||||
|
||||
def add_future_direction(self, dataframe: DataFrame) -> DataFrame:
|
||||
dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close']
|
||||
dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
|
||||
return dataframe
|
||||
|
||||
def predict_with_model(self, dataframe: DataFrame, model):
|
||||
features = self.features
|
||||
|
||||
# Supprimer les lignes avec des NaN
|
||||
|
||||
# Prédire les probabilités
|
||||
#dataframe['prediction_prob'] = model.predict_proba(dataframe[features])[:, 1]
|
||||
#dataframe['predicted_direction'] = dataframe['prediction_prob'].apply(lambda x: 1 if x > 0.5 else 0)
|
||||
|
||||
# Assurez-vous que dataframe est une copie complète
|
||||
dataframe = dataframe.copy()
|
||||
dataframe = dataframe.dropna()
|
||||
|
||||
# Prédire les probabilités
|
||||
dataframe.loc[:, 'prediction_prob'] = model.predict_proba(dataframe[features])[:, 1]
|
||||
|
||||
# Appliquer la direction prédite
|
||||
dataframe.loc[:, 'predicted_direction'] = (dataframe['prediction_prob'] > 0.5).astype(int)
|
||||
|
||||
return dataframe
|
||||
|
||||
# Define indicators to avoid recalculating them
|
||||
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||||
dataframe['enter_long'] = ""
|
||||
dataframe['enter_tag'] = ""
|
||||
dataframe['percent'] = (dataframe['close'] - dataframe['open']) / dataframe['open']
|
||||
|
||||
"""
|
||||
Populate indicators used in the strategy.
|
||||
"""
|
||||
# Moving Averages
|
||||
dataframe['ema50'] = EMAIndicator(dataframe['close'], window=50).ema_indicator()
|
||||
dataframe['ema200'] = EMAIndicator(dataframe['close'], window=200).ema_indicator()
|
||||
dataframe['ma_downtrend'] = (dataframe['close'] < dataframe['ema50']) & (dataframe['ema50'] < dataframe['ema200'])
|
||||
|
||||
# RSI
|
||||
dataframe['rsi'] = RSIIndicator(dataframe['close'], window=14).rsi()
|
||||
dataframe['rsi_downtrend'] = dataframe['rsi'] < 50
|
||||
|
||||
# MACD
|
||||
macd = MACD(dataframe['close'], window_slow=26, window_fast=12, window_sign=9)
|
||||
dataframe['macd'] = macd.macd()
|
||||
dataframe['macd_signal'] = macd.macd_signal()
|
||||
dataframe['macd_downtrend'] = (dataframe['macd'] < dataframe['macd_signal']) & (dataframe['macd'] < 0)
|
||||
|
||||
# ADX
|
||||
adx = ADXIndicator(dataframe['high'], dataframe['low'], dataframe['close'], window=14)
|
||||
dataframe['adx'] = adx.adx()
|
||||
dataframe['adx_downtrend'] = (adx.adx_neg() > adx.adx_pos()) & (dataframe['adx'] > 25)
|
||||
|
||||
# Bollinger Bands
|
||||
bb = BollingerBands(dataframe['close'], window=20, window_dev=2)
|
||||
dataframe['bb_lower'] = bb.bollinger_lband()
|
||||
dataframe['bb_downtrend'] = dataframe['close'] < dataframe['bb_lower']
|
||||
|
||||
# Stochastic Oscillator
|
||||
stoch = StochasticOscillator(dataframe['high'], dataframe['low'], dataframe['close'], window=14, smooth_window=3)
|
||||
dataframe['stoch_k'] = stoch.stoch()
|
||||
dataframe['stoch_d'] = stoch.stoch_signal()
|
||||
dataframe['stoch_downtrend'] = (dataframe['stoch_k'] < 20) & (dataframe['stoch_d'] < 20)
|
||||
|
||||
# CMF (Chaikin Money Flow)
|
||||
cmf = ChaikinMoneyFlowIndicator(dataframe['high'], dataframe['low'], dataframe['close'], dataframe['volume'], window=20)
|
||||
dataframe['cmf'] = cmf.chaikin_money_flow()
|
||||
dataframe['cmf_downtrend'] = dataframe['cmf'] < -0.2
|
||||
|
||||
# Heikin Ashi (simplified)
|
||||
heikinashi = qtpylib.heikinashi(dataframe)
|
||||
dataframe['ha_open'] = heikinashi['open']
|
||||
dataframe['ha_close'] = heikinashi['close']
|
||||
dataframe['ha_mid'] = (dataframe['ha_close'] + dataframe['ha_open']) / 2
|
||||
dataframe['ha_percent'] = (dataframe['ha_close'] - dataframe['ha_open']) / dataframe['ha_open']
|
||||
dataframe['ha_percent12'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(12)) / dataframe['ha_open']
|
||||
dataframe['ha_percent24'] = (dataframe['ha_close'] - dataframe['ha_open'].shift(24)) / dataframe['ha_open']
|
||||
|
||||
dataframe['volume2'] = dataframe['volume']
|
||||
dataframe.loc[dataframe['ha_percent'] < 0, 'volume2'] *= -1
|
||||
|
||||
# Volume confirmation
|
||||
dataframe['volume_spike'] = abs(dataframe['volume2']) > abs(dataframe['volume2'].rolling(window=20).mean() * 1.5)
|
||||
|
||||
# dataframe['ha_close'] = (dataframe['open'] + dataframe['high'] + dataframe['low'] + dataframe['close']) / 4
|
||||
# dataframe['ha_open'] = dataframe['ha_close'].shift(1).combine_first(dataframe['open'])
|
||||
dataframe['ha_red'] = dataframe['ha_close'] < dataframe['ha_open']
|
||||
|
||||
# ATR (Average True Range)
|
||||
dataframe['atr'] = (dataframe['high'] - dataframe['low']).rolling(window=14).mean()
|
||||
dataframe['atr_spike'] = dataframe['atr'] > dataframe['atr'].rolling(window=20).mean()
|
||||
|
||||
dataframe['min30'] = talib.MIN(dataframe['close'], timeperiod=30)
|
||||
dataframe['max30'] = talib.MAX(dataframe['close'], timeperiod=30)
|
||||
|
||||
# Calcul des indicateurs
|
||||
dataframe['rsi'] = talib.RSI(dataframe, timeperiod=14)
|
||||
#dataframe['macd'], dataframe['macd_signal'], _ = talib.MACD(dataframe)
|
||||
|
||||
# Bollinger Bands
|
||||
bollinger = talib.BBANDS(dataframe, timeperiod=20)
|
||||
dataframe['bb_upperband'] = bollinger['upperband']
|
||||
dataframe['bb_middleband'] = bollinger['middleband']
|
||||
dataframe['bb_lowerband'] = bollinger['lowerband']
|
||||
|
||||
# Calcul des changements futurs
|
||||
dataframe['future_change'] = dataframe['close'].shift(-5) - dataframe['close']
|
||||
dataframe['future_direction'] = dataframe['future_change'].apply(lambda x: 1 if x > 0 else 0)
|
||||
|
||||
# Segmenter RSI
|
||||
dataframe['rsi_range'] = pd.cut(dataframe['rsi'], bins=[0, 30, 70, 100], labels=['oversold', 'neutral', 'overbought'])
|
||||
dataframe['rsi_percent'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(1)) / dataframe['rsi']
|
||||
dataframe['rsi_percent3'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(3)) / dataframe['rsi']
|
||||
dataframe['rsi_percent5'] = 100 * (dataframe['rsi'] - dataframe['rsi'].shift(5)) / dataframe['rsi']
|
||||
dataframe['rsi_pct_range'] = pd.cut(dataframe['rsi_percent3'], bins=[-80, -15, -10, -5, 0, 5, 10, 15, 80], labels=['-20', '-15', '-10', '-5', '0', '5', '10', '15'])
|
||||
|
||||
#dataframe = self.calculate_negative(dataframe)
|
||||
# Probabilités par RSI
|
||||
#for group in self.features:
|
||||
# rsi_probabilities = dataframe.groupby(group)['future_direction'].mean()
|
||||
# print(f"Probabilités de hausse selon {group} :\n", rsi_probabilities)
|
||||
|
||||
rsi_probabilities = dataframe.groupby('rsi_pct_range')['future_direction'].mean()
|
||||
print(f"Probabilités de hausse selon rsi_pct_range :\n", rsi_probabilities)
|
||||
|
||||
# Probabilités par MACD (positif/négatif)
|
||||
#dataframe['macd_signal'] = (dataframe['macd'] > dataframe['macd_signal']).astype(int)
|
||||
#macd_probabilities = dataframe.groupby('macd_signal')['future_direction'].mean()
|
||||
#print("Probabilités de hausse selon MACD :\n", macd_probabilities)
|
||||
|
||||
return dataframe
|
||||
|
||||
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||||
"""
|
||||
Define buy signal logic.
|
||||
"""
|
||||
dataframe.loc[
|
||||
(
|
||||
# Combine indicators to detect a strong downtrend
|
||||
(dataframe['ha_red'].shift(1) == 1)
|
||||
& (dataframe['ha_red'] == 0)
|
||||
# dataframe['ma_downtrend'] &
|
||||
# dataframe['rsi_downtrend'] &
|
||||
# dataframe['macd_downtrend'] &
|
||||
# dataframe['adx_downtrend'] &
|
||||
# dataframe['bb_downtrend'] &
|
||||
# dataframe['volume_spike'] &
|
||||
# dataframe['stoch_downtrend'] &
|
||||
# dataframe['cmf_downtrend'] &
|
||||
# dataframe['ha_red'] &
|
||||
# dataframe['atr_spike']
|
||||
),
|
||||
'enter_long'] = 1
|
||||
nan_columns = dataframe.columns[dataframe.isna().any()].tolist()
|
||||
print("Colonnes contenant des NaN :", nan_columns)
|
||||
|
||||
# Compter les colonnes avec uniquement des NaN
|
||||
num_only_nan_columns = dataframe.isna().all().sum()
|
||||
print(f"Nombre de colonnes contenant uniquement des NaN : {num_only_nan_columns}")
|
||||
|
||||
# Compter les NaN par colonne
|
||||
nan_count_per_column = dataframe.isna().sum()
|
||||
print("Nombre de NaN par colonne :")
|
||||
print(nan_count_per_column)
|
||||
columns_with_nan = nan_count_per_column[nan_count_per_column > 1000]
|
||||
print("Colonnes avec au moins 1000 NaN :")
|
||||
print(columns_with_nan)
|
||||
|
||||
|
||||
# Former le modèle si ce n'est pas déjà fait
|
||||
if self.model is None:
|
||||
dataframe = self.add_future_direction(dataframe)
|
||||
self.model = self.train_model(dataframe)
|
||||
|
||||
# Faire des prédictions
|
||||
dataframe = self.predict_with_model(dataframe, self.model)
|
||||
|
||||
# Entrée sur un signal de hausse
|
||||
#dataframe.loc[
|
||||
# (dataframe['predicted_direction'] == 1),
|
||||
# 'enter_long'] = 1
|
||||
|
||||
return dataframe
|
||||
|
||||
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
||||
"""
|
||||
Define sell signal logic.
|
||||
"""
|
||||
# dataframe.loc[
|
||||
# (
|
||||
# # Exit condition: close price crosses above EMA50
|
||||
# dataframe['close'] > dataframe['ema50']
|
||||
# ),
|
||||
# 'sell'] = 1
|
||||
|
||||
return dataframe
|
||||
|
||||
def calculate_negative(self, df: DataFrame) -> DataFrame:
|
||||
negative_counts = []
|
||||
|
||||
# Boucle optimisée pour calculer les positions fermées en perte avant la position actuelle
|
||||
for i in range(len(df)):
|
||||
# Subset jusqu'à la ligne actuelle
|
||||
previous_trades = df['percent'].iloc[:i]
|
||||
|
||||
# Trouver toutes les positions négatives avant le premier profit positif
|
||||
if not previous_trades.empty:
|
||||
mask = (previous_trades > 0).idxmax() # Trouver l'index du premier profit positif
|
||||
count = (previous_trades.iloc[:mask] < 0).sum() if mask > 0 else len(previous_trades)
|
||||
else:
|
||||
count = 0 # Si aucune position précédente, pas de perte
|
||||
|
||||
negative_counts.append(count)
|
||||
|
||||
df['negative_positions_before'] = negative_counts
|
||||
print(df)
|
||||
Reference in New Issue
Block a user