Files
FreqStats/src/app.py
Jérôme Delacotte 7d17e30ccf Changement répertoire
2025-10-21 20:39:50 +02:00

274 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, jsonify, abort, render_template, send_from_directory,request
import pandas as pd
import json
import zipfile
import os
import pickle
import joblib
import glob
from io import TextIOWrapper
# from ydata_profiling import ProfileReport
# model
# from sklearn.model_selection import train_test_split
# from sklearn.preprocessing import StandardScaler
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Dense
# from tensorflow.keras.utils import plot_model
#
# from keras.models import Sequential
# from keras.layers import Dense
# from ann_visualizer.visualize import ann_viz
app = Flask(__name__)
FREQTRADE_USERDATA_DIR = '/mnt/external'
@app.route('/')
def home():
# Liste les fichiers dans le répertoire monté
files = list(filter(os.path.isfile, glob.glob(FREQTRADE_USERDATA_DIR + "/backtest_results/" + "*")))
files.sort(key=os.path.getctime)
# Filtre pour obtenir uniquement les fichiers (pas les dossiers)
files = [os.path.basename(f) for f in files if os.path.isfile(f) and f.lower().endswith('.zip')]
# files2 = os.listdir(FREQTRADE_USERDATA_DIR + "/backtest_results")
files2 = list(filter(os.path.isfile, glob.glob(FREQTRADE_USERDATA_DIR + "/backtest_results/" + "*")))
files2.sort(key=os.path.getctime)
files2 = [os.path.basename(f) for f in files2 if os.path.isfile(f) and f.lower().endswith('.feather')]
# Retourne le template avec la liste des fichiers
return render_template('index.html', files=files, files2=files2)
@app.route('/process', methods=['POST'])
def process():
# Traitez ici les données envoyées par l'utilisateur
return "Données traitées !"
@app.route('/read_json/<path:filename>')
def read_json(filename):
full_path = os.path.join(FREQTRADE_USERDATA_DIR + "/backtest_results", filename)
if filename.endswith('.json'):
with open(full_path) as f:
return f.read(), 200, {'Content-Type': 'application/json'}
if filename.endswith('.pkl'):
try:
data = joblib.load(full_path)
if isinstance(data, pd.DataFrame):
return data.to_json(orient='split'), 200, {'Content-Type': 'application/json'}
if isinstance(data, dict):
df = pd.DataFrame.from_dict(data)
return df.to_json(orient='split'), 200, {'Content-Type': 'application/json'}
if isinstance(data, list):
df = pd.DataFrame(data)
return df.to_json(orient='split'), 200, {'Content-Type': 'application/json'}
return json.dumps({"error": f"Type {type(data)} non géré."}), 200
except Exception as e:
return json.dumps({"error": str(e)}), 500
elif filename.endswith('.zip'):
try:
with zipfile.ZipFile(full_path) as z:
zip_contents = {}
for name in z.namelist():
if name.endswith('.json'):
with z.open(name) as f:
print(f"load_json {name}")
zip_contents[name] = json.load(f)
elif name.endswith('.feather'):
with z.open(name) as f:
dataframe = pd.read_feather(f)
zip_contents[name] = dataframe.to_json(orient="records")
elif name.endswith('.pkl'):
with z.open(name) as f:
try:
data = joblib.load(f)
if isinstance(data, pd.DataFrame):
print(f"dataframe {name}")
zip_contents[name] = data.to_json(orient='records') #, 200, {'Content-Type': 'application/json'}
elif isinstance(data, dict):
# On suppose quil y a un seul modèle/clé au premier niveau
outer_key = list(data.keys())[0]
inner_dict = data[outer_key]
if isinstance(inner_dict, dict):
print(f"dict {name}")
# On suppose quil y a une seule paire (ex: 'BTC/USDT')
inner_key = list(inner_dict.keys())[0]
df = inner_dict[inner_key]
if isinstance(df, pd.DataFrame):
type = name
# if "signals" in name:
# type = 'signals'
# elif "exited" in name:
# type = "exited"
zip_contents[type] = df.to_json(orient='records') #, 200, {'Content-Type': 'application/json'}
elif isinstance(data, list):
print(f"list {name}")
df = pd.DataFrame(data)
zip_contents[name] = df.to_json(orient='records') # , 200, {'Content-Type': 'application/json'}
else:
zip_contents[name] = json.dumps({"error": f"Type {type(data)} non géré."}), 200
except Exception as e:
zip_contents[name] = json.dumps({"error": str(e)}), 500
return json.dumps(zip_contents)
except Exception as e:
return json.dumps({"error": str(e)}), 500
return json.dumps({"error": "Fichier non pris en charge"}), 400
@app.route('/read_feather/<path:filename>')
def read_feather(filename):
path = os.path.join(FREQTRADE_USERDATA_DIR + "/backtest_results/", filename)
try:
dataframe = pd.read_feather(path)
# dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=200)
# dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
#
# dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50)
# dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200)
# dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200)
return dataframe.to_json(orient="records")
except Exception as e:
print(e)
return jsonify({"error": str(e)}), 500
@app.route('/generate_report')
def generate_report():
filename = request.args.get('filename', '')
path = os.path.join(FREQTRADE_USERDATA_DIR + "/backtest_results/", filename)
print(path)
indicators = request.args.get('indicators', '').split(',')
print(indicators)
try:
dataframe = pd.read_feather(path)
print(dataframe.columns)
df = dataframe[indicators]
profile = ProfileReport(df.loc[1:100], tsmode=True, sortby="date", title="Time-Series EDA")
profile.to_file(FREQTRADE_USERDATA_DIR + "/reports/report_timeseries.html")
return dataframe.to_json(orient="records")
except Exception as e:
print(e)
return jsonify({"error": str(e)}), 500
@app.route('/get_chart_data')
def get_chart_data():
filename = request.args.get('filename', '')
path = os.path.join(FREQTRADE_USERDATA_DIR + "/backtest_results/", filename)
print(path)
indicators = request.args.get('indicators', '').split(',')
df = pd.read_feather(path)
# # Calculs conditionnels
# if 'sma' in indicators:
# df['sma'] = df['close'].rolling(window=20).mean()
# if 'rsi' in indicators:
# delta = df['close'].diff()
# gain = delta.where(delta > 0, 0)
# loss = -delta.where(delta < 0, 0)
# avg_gain = gain.rolling(14).mean()
# avg_loss = loss.rolling(14).mean()
# rs = avg_gain / avg_loss
# df['rsi'] = 100 - (100 / (1 + rs))
# if 'bollinger' in indicators:
# sma = df['close'].rolling(window=20).mean()
# std = df['close'].rolling(window=20).std()
# df['bb_upper'] = sma + 2 * std
# df['bb_lower'] = sma - 2 * std
#
# df = df.dropna()
# Simplifier les données pour le JSON
# chart_data = {
# 'date': df['date'].astype(str).tolist(),
# 'close': df['close'].tolist(),
# 'sma': df.get('sma', pd.Series()).tolist(),
# 'rsi': df.get('rsi', pd.Series()).tolist(),
# 'bb_upper': df.get('bb_upper', pd.Series()).tolist(),
# 'bb_lower': df.get('bb_lower', pd.Series()).tolist(),
# }
return df.to_json(orient="records") #jsonify(chart_data)
# @app.route('/generate_model')
# def generate_model():
# filename = request.args.get('filename', '')
# path = os.path.join(FREQTRADE_USERDATA_DIR + "/backtest_results/", filename)
# print(path)
# # indicators = request.args.get('indicators', '').split(',')
# df = pd.read_feather(path)
#
# # Choisir les colonnes techniques comme variables d'entrée (X)
# feature_cols = ['close', 'rsi', 'sma5', 'sma10', 'sma20', 'sma5_1h', 'volume', 'sma5_1h']
#
# # Variable cible 2 heures
# df['target'] = (df['close'].shift(-24) - df['close']) / df['close']
#
# # Supprimer les lignes avec des NaN
# df.dropna(subset=feature_cols + ['target'], inplace=True)
#
# X = df[feature_cols].values
# y = df['target'].values
#
# # Normalisation
# scaler = StandardScaler()
# X = scaler.fit_transform(X)
#
# # Split
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
#
# # Modèle
# model = Sequential([
# Dense(64, input_dim=X.shape[1], activation='relu'),
# Dense(32, activation='relu'),
# Dense(1) # Prédiction continue
# ])
#
# model.compile(optimizer='adam', loss='mse', metrics=['mae'])
#
# # Entraînement
# model.fit(X_train, y_train, epochs=100, batch_size=64, validation_data=(X_test, y_test))
#
# loss, mae = model.evaluate(X_test, y_test)
# print(f"Erreur moyenne absolue : {mae:.4f}")
#
# model.summary()
#
# plot_model(model, show_shapes=True, show_layer_names=True, to_file=FREQTRADE_USERDATA_DIR + "/reports/model.png")
#
# model.save(FREQTRADE_USERDATA_DIR + "/reports/model.h5")
#
# # ann_viz(model, title="Mon réseau", filename=FREQTRADE_USERDATA_DIR + "/reports/network.gv", view=True)
#
# # Créer un exemple de modèle si non encore généré
# model_path = FREQTRADE_USERDATA_DIR + "/reports/model.png"
# if not os.path.exists(model_path):
# model = Sequential([
# Dense(64, input_shape=(6,), activation='relu'),
# Dense(32, activation='relu'),
# Dense(1)
# ])
# plot_model(model, to_file=model_path, show_shapes=True, show_layer_names=True)
# return render_template('model.html', model_image=model_path)
#
# Route pour servir les fichiers statiques (optionnelle si bien configuré)
# @app.route('/static/<path:filename>')
# def static_files(filename):
# return send_from_directory('static', filename)
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)