#!/usr/bin/env python3 """ Detect bull/bear/range regimes from historical CSV data (SMA50/SMA200 crossovers) Usage: python3 detect_regime.py CSV must contain at least columns: timestamp, close """ # python3 user_data/strategies/tools/detect_regime.py BTC /home/jerome/Perso/freqtradeDocker/user_data/data/binance/BTC_USDT-1d.feather import sys import pandas as pd import ta import talib.abstract as talib if len(sys.argv) < 3: print("Usage: detect_regime.py ") sys.exit(1) pair = sys.argv[1] file = sys.argv[2] # lecture du fichier feather df = pd.read_feather(file) # ne garder que timestamp et close pour detect_regime.py df[['date', 'close']].rename(columns={'date':'timestamp'}).to_csv( f"user_data/data/{pair}-usdt-1d.csv", index=False ) df = pd.read_csv(f"user_data/data/{pair}-usdt-1d.csv") if 'close' not in df.columns or 'timestamp' not in df.columns: print("CSV must contain 'timestamp' and 'close' columns") sys.exit(1) # --- paramètres --- SMOOTH_WIN = 10 # EMA pour lisser la pente NUM_TOP = 1 # nombre de segments à garder par classe # --- charger les données --- df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') # filtrer uniquement 2024 et 2025 df = df[df['timestamp'].dt.year.isin([2024, 2025])] # --- calcul SMA14 --- df['sma'] = talib.SMA(df, timeperiod=20) #ta.trend.sma_indicator(df['close'], 14) # --- pente brute --- df['slope'] = df['sma'].diff() # --- lissage EMA --- df['slope_smooth'] = df['slope'].ewm(span=SMOOTH_WIN, adjust=False).mean() #df["slope_smooth"] = savgol_filter(df["slope_smooth"], window_length=21, polyorder=3) # --- normalisation relative --- df['slope_norm'] = df['slope_smooth'] / df['close'] # df['slope_norm'].fillna(0, inplace=True) df['slope_norm'] = df['slope_norm'].fillna(0) # --- classification dynamique via quantiles --- q = df['slope_norm'].quantile([0.125, 0.375, 0.625, 0.875]).values q1, q2, q3, q4 = q def classify(v): if v <= q1: return 'BR2' elif v <= q2: return 'BR1' elif v <= q3: return 'RG' elif v <= q4: return 'BU1' else: return 'BU2' # q = df['slope_norm'].quantile([0.7, 0.21, 0.35, 0.65, 0.79, 0.93]).values # q1, q2, q3, q4, q5, q6 = q # # def classify(v): # if v <= q1: # return 'BR3' # elif v <= q2: # return 'BR2' # elif v <= q3: # return 'BR1' # elif v <= q4: # return 'RG' # elif v <= q5: # return 'BU1' # elif v <= q6: # return 'BU2' # else: # return 'BU3' df['trend_class'] = df['slope_norm'].apply(classify) # --- boucle pour détecter les segments --- segments = [] trend = None start_idx = None for i in range(len(df)): new_trend = df['trend_class'].iloc[i] if trend is None: trend = new_trend start_idx = i elif new_trend != trend: start_ts = df['timestamp'].iloc[start_idx] end_ts = df['timestamp'].iloc[i] segments.append((trend, start_ts, end_ts)) trend = new_trend start_idx = i # fermer le dernier segment if trend is not None and start_idx is not None: start_ts = df['timestamp'].iloc[start_idx] end_ts = df['timestamp'].iloc[len(df)-1] segments.append((trend, start_ts, end_ts)) # --- extraire les 5 plus longs segments par classe --- top_segments_by_class = {} for cls in ['BR2','BR1','RG','BU1','BU2']: cls_segments = [(t,s,e) for t,s,e in segments if t==cls] # calcul de la durée cls_segments = [(t,s,e,(e-s).total_seconds()) for t,s,e in cls_segments] cls_segments.sort(key=lambda x: x[3], reverse=True) top_segments_by_class[cls] = [(t,s,e) for t,s,e,d in cls_segments[:NUM_TOP]] # --- affichage --- for cls, segs in top_segments_by_class.items(): print(f"--- {cls} ---") for t,s,e in segs: print(f"{t} {s} {e}")