138 lines
3.8 KiB
Python
Executable File
138 lines
3.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Detect bull/bear/range regimes from historical CSV data (SMA50/SMA200 crossovers)
|
|
Usage: python3 detect_regime.py <PAIR> <CSV_FILE>
|
|
CSV must contain at least columns: timestamp, close
|
|
"""
|
|
|
|
# python3 user_data/strategies/tools/detect_regime.py BTC /home/jerome/Perso/freqtradeDocker/user_data/data/binance/BTC_USDT-1d.feather
|
|
|
|
import sys
|
|
import pandas as pd
|
|
import ta
|
|
import talib.abstract as talib
|
|
|
|
if len(sys.argv) < 3:
|
|
print("Usage: detect_regime.py <PAIR> <FEATHER_FILE>")
|
|
sys.exit(1)
|
|
|
|
pair = sys.argv[1]
|
|
file = sys.argv[2]
|
|
|
|
# lecture du fichier feather
|
|
df = pd.read_feather(file)
|
|
# ne garder que timestamp et close pour detect_regime.py
|
|
df[['date', 'close']].rename(columns={'date':'timestamp'}).to_csv(
|
|
f"user_data/data/{pair}-usdt-1d.csv", index=False
|
|
)
|
|
|
|
df = pd.read_csv(f"user_data/data/{pair}-usdt-1d.csv")
|
|
|
|
if 'close' not in df.columns or 'timestamp' not in df.columns:
|
|
print("CSV must contain 'timestamp' and 'close' columns")
|
|
sys.exit(1)
|
|
|
|
# --- paramètres ---
|
|
SMOOTH_WIN = 10 # EMA pour lisser la pente
|
|
NUM_TOP = 1 # nombre de segments à garder par classe
|
|
|
|
# --- charger les données ---
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
|
|
|
|
# filtrer uniquement 2024 et 2025
|
|
df = df[df['timestamp'].dt.year.isin([2024, 2025])]
|
|
|
|
# --- calcul SMA14 ---
|
|
df['sma'] = talib.SMA(df, timeperiod=20) #ta.trend.sma_indicator(df['close'], 14)
|
|
|
|
# --- pente brute ---
|
|
df['slope'] = df['sma'].diff()
|
|
|
|
# --- lissage EMA ---
|
|
df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean()
|
|
|
|
#df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3)
|
|
|
|
# --- normalisation relative ---
|
|
df['slope_norm'] = df['slope_smooth'] / df['close']
|
|
# df['slope_norm'].fillna(0, inplace=True)
|
|
df['slope_norm'] = df['slope_norm'].fillna(0)
|
|
|
|
|
|
# --- classification dynamique via quantiles ---
|
|
|
|
q = df['slope_norm'].quantile([0.125, 0.375, 0.625, 0.875]).values
|
|
q1, q2, q3, q4 = q
|
|
|
|
def classify(v):
|
|
if v <= q1:
|
|
return 'BR2'
|
|
elif v <= q2:
|
|
return 'BR1'
|
|
elif v <= q3:
|
|
return 'RG'
|
|
elif v <= q4:
|
|
return 'BU1'
|
|
else:
|
|
return 'BU2'
|
|
|
|
# q = df['slope_norm'].quantile([0.7, 0.21, 0.35, 0.65, 0.79, 0.93]).values
|
|
# q1, q2, q3, q4, q5, q6 = q
|
|
#
|
|
# def classify(v):
|
|
# if v <= q1:
|
|
# return 'BR3'
|
|
# elif v <= q2:
|
|
# return 'BR2'
|
|
# elif v <= q3:
|
|
# return 'BR1'
|
|
# elif v <= q4:
|
|
# return 'RG'
|
|
# elif v <= q5:
|
|
# return 'BU1'
|
|
# elif v <= q6:
|
|
# return 'BU2'
|
|
# else:
|
|
# return 'BU3'
|
|
|
|
df['trend_class'] = df['slope_norm'].apply(classify)
|
|
|
|
# --- boucle pour détecter les segments ---
|
|
segments = []
|
|
trend = None
|
|
start_idx = None
|
|
|
|
for i in range(len(df)):
|
|
new_trend = df['trend_class'].iloc[i]
|
|
|
|
if trend is None:
|
|
trend = new_trend
|
|
start_idx = i
|
|
elif new_trend != trend:
|
|
start_ts = df['timestamp'].iloc[start_idx]
|
|
end_ts = df['timestamp'].iloc[i]
|
|
segments.append((trend, start_ts, end_ts))
|
|
trend = new_trend
|
|
start_idx = i
|
|
|
|
# fermer le dernier segment
|
|
if trend is not None and start_idx is not None:
|
|
start_ts = df['timestamp'].iloc[start_idx]
|
|
end_ts = df['timestamp'].iloc[len(df)-1]
|
|
segments.append((trend, start_ts, end_ts))
|
|
|
|
# --- extraire les 5 plus longs segments par classe ---
|
|
top_segments_by_class = {}
|
|
for cls in ['BR2','BR1','RG','BU1','BU2']:
|
|
cls_segments = [(t,s,e) for t,s,e in segments if t==cls]
|
|
# calcul de la durée
|
|
cls_segments = [(t,s,e,(e-s).total_seconds()) for t,s,e in cls_segments]
|
|
cls_segments.sort(key=lambda x: x[3], reverse=True)
|
|
top_segments_by_class[cls] = [(t,s,e) for t,s,e,d in cls_segments[:NUM_TOP]]
|
|
|
|
# --- affichage ---
|
|
for cls, segs in top_segments_by_class.items():
|
|
print(f"--- {cls} ---")
|
|
for t,s,e in segs:
|
|
print(f"{t} {s} {e}")
|