Files
Freqtrade/Zeus_11.py
Jérôme Delacotte 0bb7271121 divers
2025-05-06 13:14:06 +02:00

976 lines
44 KiB
Python

# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
from pandas import DataFrame
from typing import Optional, Union, Tuple
from scipy.special import binom
import logging
import configparser
from technical import pivots_points
# --------------------------------
# Add your lib to import here test git
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
import requests
from datetime import timezone, timedelta
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
logger = logging.getLogger(__name__)
from tabulate import tabulate
def pprint_df(dframe):
print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False))
def normalize(df):
df = (df - df.min()) / (df.max() - df.min())
return df
class Zeus_11(IStrategy):
levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
# ROI table:
minimal_roi = {
"0": 10
}
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = False
# Buy hypers
timeframe = '5m'
columns_logged = False
# DCA config
position_adjustment_enable = True
plot_config = {
"main_plot": {
"min200": {
"color": "#86c932"
},
"max50": {
"color": "white"
},
"max200": {
"color": "yellow"
},
"bb_lowerband": {
"color": "#da59a6"},
"bb_upperband": {
"color": "#da59a6",
}
},
"subplots": {
"Rsi": {
"rsi": {
"color": "pink"
}
},
"Percent": {
"max_min": {
"color": "#74effc"
}
}
}
}
# 20 20 40 60 100 160 260 420
# 50 50 100 300 500
# fibo = [1, 1, 2, 3, 5, 8, 13, 21]
# my fibo
# 50 50 50 100 100 150 200 250 350 450 600 1050
fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21]
baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84]
# Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21
# Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050
# Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200
# baisse 1 2 3 5 7 10 14 19 26 35 47 63 84
trades = list()
max_profit_pairs = {}
pairs = {
pair: {
"first_buy": 0,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
"last_buy": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
"last_candle": {},
"last_trade": None,
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
# def min_max_scaling(self, series: pd.Series) -> pd.Series:
# """Normaliser les données en les ramenant entre 0 et 100."""
# return 100 * (series - series.min()) / (series.max() - series.min())
#
# def z_score_scaling(self, series: pd.Series) -> pd.Series:
# """Normaliser les données en utilisant Z-Score Scaling."""
# return (series - series.mean()) / series.std()
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
# count_buys = 0
# trade = self.getTrade(pair)
# if trade:
# filled_buys = trade.select_filled_orders('buy')
# count_buys = len(filled_buys)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
# last_candle_12 = dataframe.iloc[-13].squeeze()
# allow_to_buy = True #(not self.stop_all) #& (not self.all_down)
allow_to_buy = True # (rate <= float(limit)) | (entry_tag == 'force_entry')
self.trades = list()
dispo = round(self.wallets.get_available_stake_amount())
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
print(
f"|{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
)
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.log_trade(
last_candle=last_candle,
date=current_time,
action="START BUY",
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
allow_to_sell = (last_candle['percent'] < 0)
if allow_to_sell:
self.pairs[pair]['last_count_of_buys'] = self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['last_candle'] = last_candle
self.trades = list()
dispo= round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Sell",
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2)
)
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
# else:
# print('Cancel Sell ' + exit_reason + ' ' + str(current_time) + ' ' + pair)
return (allow_to_sell) | (exit_reason == 'force_exit')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
current_candle = dataframe.iloc[-1].squeeze()
adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle)
# print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}")
# Use default stake amount.
return adjusted_stake_amount
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
#count_of_buys = trade.nr_of_successful_entries
max_touch_before = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['haclose'], self.pairs[pair]['last_max'])
last_lost = (last_candle['close'] - max_touch_before) / max_touch_before
count_of_buys = trade.nr_of_successful_entries
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = current_profit
expected_profit = self.expectedProfit(pair, last_candle)
if (last_candle['rsi_1d'] > 50) & (last_candle['percent12'] < 0.0):
if (last_candle['percent3'] < 0.0) & (current_profit > last_candle['min_max200'] / 3):
self.trades = list()
return 'mx_' + str(count_of_buys)
if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit):
self.trades = list()
return 'profit_' + str(count_of_buys)
if (current_profit >= expected_profit) & (last_candle['percent'] < 0.0) \
and ((last_candle['rsi'] >= 75) or before_last_candle['rsi'] >= 75):
self.trades = list()
return 'rsi_' + str(count_of_buys)
self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch'])
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt':
return
if self.columns_logged % 30 == 0:
# print(
# f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
# )
print(
f"| {'Date':<16} | {'Action':<10} | {'Pair':<5} | {'Trade Type':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>5} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>12} | {'Buys':>5} | {'Stake':>10} |"
)
print(
f"|{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
# int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = round((last_candle['haclose'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
max_touch = '' #round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1)
if trade_type is not None:
if np.isnan(last_candle['rsi_1d']):
string = ' '
else:
string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_diff_1d']))
trade_type = trade_type \
+ " " + string \
+ " " + str(int(last_candle['rsi_1h'])) \
+ " " + str(int(last_candle['rsi_diff_1h']))
print(
f"| {date:<16} | {action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {pct_max or '-':>5} | {max_touch or '-':>11} | {last_lost or '-':>12} | {round(self.pairs[pair]['last_max'], 2) or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |"
)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['close_02'] = dataframe['haclose'] * 1.02
dataframe['pct_change'] = dataframe['close'].pct_change(5)
dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=200)
dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50)
dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200)
dataframe['max50'] = talib.MAX(dataframe['close'], timeperiod=50)
dataframe['min_max50'] = (dataframe['max50'] - dataframe['min50']) / dataframe['min50']
dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200)
dataframe['min_max200'] = (dataframe['max200'] - dataframe['min200']) / dataframe['min200']
dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close']
dataframe['max50_diff'] = (dataframe['max50'] - dataframe['close']) / dataframe['close']
dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5)
dataframe['sma5_pct'] = (dataframe['sma5'] - dataframe['sma5']) / dataframe['sma5']
dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10)
dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20)
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3)
dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5)
dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12)
dataframe["percent24"] = (dataframe["close"] - dataframe["open"].shift(24)) / dataframe["open"].shift(24)
dataframe["percent48"] = (dataframe["close"] - dataframe["open"].shift(48)) / dataframe["open"].shift(48)
# print(metadata['pair'])
dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
# Normalization
dataframe['average_line'] = dataframe['close'].mean()
dataframe['average_line_50'] = talib.MIDPOINT(dataframe['close'], timeperiod=50)
dataframe['average_line_288'] = talib.MIDPOINT(dataframe['close'], timeperiod=288)
# Sort the close prices to find the 4 lowest values
sorted_close_prices = dataframe['close'].tail(576).sort_values()
lowest_4 = sorted_close_prices.head(20)
dataframe['lowest_4_average'] = lowest_4.mean()
# Propagate this mean value across the entire dataframe
# dataframe['lowest_4_average'] = dataframe['lowest_4_average'].iloc[0]
# # Sort the close prices to find the 4 highest values
sorted_close_prices = dataframe['close'].tail(288).sort_values(ascending=False)
highest_4 = sorted_close_prices.head(20)
# # Calculate the mean of the 4 highest values
dataframe['highest_4_average'] = highest_4.mean()
# # Propagate this mean value across the entire dataframe
# dataframe['highest_4_average'] = dataframe['highest_4_average'].iloc[0]
# dataframe['pct_average'] = (dataframe['highest_4_average'] - dataframe['close']) / dataframe['lowest_4_average']
# dataframe['highest_4_average_1'] = dataframe['highest_4_average'] * 0.99
# dataframe['highest_4_average_2'] = dataframe['highest_4_average'] * 0.98
# dataframe['highest_4_average_3'] = dataframe['highest_4_average'] * 0.97
# dataframe['highest_4_average_4'] = dataframe['highest_4_average'] * 0.96
# dataframe['highest_4_average_5'] = dataframe['highest_4_average'] * 0.95
# Compter les baisses consécutives
dataframe['down'] = dataframe['hapercent'] <= 0.001
dataframe['up'] = dataframe['hapercent'] >= -0.001
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
dataframe['down_tag'] = (dataframe['down_count'] < -7)
dataframe['up_tag'] = (dataframe['up_count'] > 7)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
# Normaliser les données de 'close'
# normalized_close = self.min_max_scaling(dataframe['close'])
################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
# x_percent = 0.01
# n_hours = 6
# n_candles = n_hours * 60 # metadata["timeframe"] # Convertir en bougies
#
# informative["max_profit"] = dataframe["informative"].rolling(n_candles).max()
# informative["profit_hit"] = dataframe["informative"] >= informative["close"] * (1 + x_percent)
#
informative['rsi'] = talib.RSI(informative['close'], timeperiod=7)
informative['rsi_diff'] = informative['rsi'] - informative['rsi'].shift(1)
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5']
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
informative['rsi'] = talib.RSI(informative['close'], timeperiod=7)
informative['rsi_diff'] = informative['rsi'] - informative['rsi'].shift(1)
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5']
sorted_close_prices = informative['close'].tail(365).sort_values()
lowest_4 = sorted_close_prices.head(4)
informative['lowest_4'] = lowest_4.mean()
sorted_close_prices = informative['close'].tail(365).sort_values(ascending=False)
highest_4 = sorted_close_prices.head(4)
informative['highest_4'] = highest_4.mean()
last_14_days = informative.tail(14)
# Récupérer le minimum et le maximum de la colonne 'close' des 14 derniers jours
min_14_days = last_14_days['close'].min()
max_14_days = last_14_days['close'].max()
informative['lowest'] = min_14_days
informative['highest'] = max_14_days
informative['pct_min_max'] = (max_14_days - min_14_days) / min_14_days
informative['mid_min_max'] = min_14_days + (max_14_days - min_14_days) / 2
informative['middle'] = informative['lowest_4'] + (informative['highest_4'] - informative['lowest_4']) / 2
informative['mid_min_max_0.98'] = informative['mid_min_max'] * 0.98
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
dataframe['count_buys'] = 0
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01
dataframe['amount'] = 0
dataframe['limit'] = dataframe['close']
count_buys = 0
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
print(trade)
filled_buys = trade.select_filled_orders('buy')
dataframe['count_buys'] = len(filled_buys)
count = 0
amount = 0
for buy in filled_buys:
if count == 0:
dataframe['first_price'] = buy.price
dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
self.pairs[trade.pair]['last_buy'] = buy.price
print(buy)
count = count + 1
amount += buy.price * buy.filled
dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
count_buys = count
dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100)
dataframe['amount'] = amount
print(f"amount= {amount}")
# # trades = Trade.get_trades([Trade.is_open is False]).all()
# trades = Trade.get_trades_proxy(is_open=False, pair=metadata['pair'])
# if trades:
# trade = trades[-1]
# print('closed trade pair is : ')
# print(trade)
# dataframe['expected_profit'] = (1 + self.expectedProfit(pair, dataframe.iloc[-1])) * dataframe[
# 'last_price']
# dataframe['lbp'] = dataframe['last_price']
# dataframe['lbp_3'] = dataframe['lbp'] * 0.97 # 3
# dataframe['lbp_6'] = dataframe['lbp'] * 0.94 # 6
# dataframe['lbp_9'] = dataframe['lbp'] * 0.90 # 10
# dataframe['lbp_12'] = dataframe['lbp'] * 0.85 # 15
# dataframe['lbp_20'] = dataframe['lbp'] * 0.8 # 20
# dataframe['fbp'] = trade.open_rate
# # else:
# # last_trade = self.get_trades(pair=pair).order_by('-close_date').first()
# # filled_buys = last_trade.select_filled_orders('buy')
# # print(last_trade)
# # for buy in filled_buys:
# # print(filled_buys)
#dataframe['buy_level'] = dataframe['lowest_4_average'] * (1 - self.levels[count_buys] / 100)
dataframe['buy_level'] = dataframe['max50'] * 0.99 #(1 - self.levels[count_buys] / 100)
# ----------------------------------------------------------
# Calcul de la variation entre deux bougies successives
dataframe['price_change'] = dataframe['close'].diff()
# Marquer les bougies en baisse
dataframe['is_down'] = dataframe['price_change'] < 0
# Identifier les blocs consécutifs de baisses
# dataframe['drop_id'] = (dataframe['is_down'] != dataframe['is_down'].shift(1)).cumsum()
dataframe['drop_id'] = np.where(dataframe['is_down'],
(dataframe['is_down'] != dataframe['is_down'].shift(12)).cumsum(), np.nan)
# Identifier uniquement les blocs de baisse
dataframe['drop_id'] = dataframe['drop_id'].where(dataframe['is_down'])
# # Grouper par les chutes détectées
# drop_info = dataframe.groupby('drop_id').agg(
# start=('close', 'first'), # Prix au début de la chute
# end=('close', 'last'), # Prix à la fin de la chute
# start_index=('close', 'idxmin'), # Début de la chute (index)
# end_index=('close', 'idxmax'), # Fin de la chute (index)
# )
#
# # Calcul de l'ampleur de la chute en %
# drop_info['drop_amplitude_pct'] = ((drop_info['end'] - drop_info['start']) / drop_info['start']) * 100
# # Filtrer les chutes avec une amplitude supérieure à 3%
# drop_info = drop_info[drop_info['drop_amplitude_pct'] < -3]
# **************
# # Identifier le prix de début et de fin de chaque chute
# drop_stats = dataframe.groupby('drop_id').agg(
# start_price=('close', 'first'), # Prix au début de la chute
# end_price=('close', 'last'), # Prix à la fin de la chute
# )
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
print('search open trades')
self.trades = Trade.get_open_trades()
return self.trades
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
#expected_profit = self.expectedProfit(pair, dataframe.iloc[-1])
#last_candle = dataframe.iloc[-1].squeeze()
# print("---------------" + pair + "----------------")
# print('adjust stake amount ' + str(self.adjust_stake_amount(pair, dataframe.iloc[-1])))
# # print('adjust exit price ' + str(self.adjust_exit_price(dataframe.iloc[-1])))
# print('calcul expected_profit ' + str(expected_profit))
# buy_level = dataframe['buy_level'] # self.get_buy_level(pair, dataframe)
dataframe.loc[
(
(dataframe['rsi_1h'] < 70)
& (dataframe['rsi_diff_1h'] > -5)
# (dataframe['down_count'].shift(1) < - 6)
# & (dataframe['down_count'] == 0)
# & (dataframe['down_pct'].shift(1) <= -0.5)
), ['enter_long', 'enter_tag']] = (1, 'down')
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
# for i in range(len(dataframe) - 48):
# last_candle = dataframe.iloc[i]
# if last_candle['enter_long'] is not None:
# if last_candle['enter_long'] == 1:
# futur_candle = dataframe.iloc[i + 48]
# sma5pct_1h = last_candle['sma5_pct_1h']
# sma5pct_1d = last_candle['sma5_pct_1d']
# i = i + 48
# print(f"{i} ===> ;{sma5pct_1d:.2f};{sma5pct_1h:.2f};{100 * futur_candle['percent48']:.1f}")
# print(dataframe.columns)
#
# colonnes = [
# 'hapercent', 'close_02', 'pct_change', 'max200_diff',
# 'max50_diff', 'sma5_pct', 'percent', 'percent3',
# 'percent5', 'percent12', 'percent24', 'percent48', 'rsi',
# 'bb_percent', 'down_count',
# 'up_count', 'down_pct', 'up_pct', 'volume_1h', 'rsi_1h',
# 'sma5_pct_1h', 'volume_1d', 'rsi_1d', 'sma5_pct_1d',
# 'pct_min_max_1d']
#
# exclude_cols = ['date', 'enter_tag', 'close', 'open', 'low', 'high', 'haclose', 'haopen', 'halow', 'hahigh'
# , 'date_1h', 'close_1h', 'open_1h', 'low_1h', 'high_1h', 'haclose_1h', 'haopen_1h', 'halow_1h', 'hahigh_1h'
# , 'date_1d', 'close_1d', 'open_1d', 'low_1d', 'high_1d', 'haclose_1d', 'haopen_1d', 'halow_1d', 'hahigh_1d']
# for column in colonnes:
# for column2 in colonnes:
# print('===============================================')
# print(f"Colonne 1: {column} Colonne 2: {column2}")
# list_1 = []
# list_2 = []
# data = []
# key_1 = column
# key_2 = column2
# futur = 'percent48'
#
# for i in range(200, len(dataframe) - 48):
# last_candle = dataframe.iloc[i]
# if last_candle['enter_long'] is not None and last_candle['enter_long'] == 1:
# futur_candle = dataframe.iloc[i + 48]
# val_1 = last_candle[key_1]
# val_2 = last_candle[key_2]
# if not np.isnan(val_1) and not np.isnan(val_2):
# value = 100 * futur_candle[futur]
# list_1.append(val_2)
# list_2.append(val_1)
# data.append(value)
# i += 48 # skip to avoid overlapping trades
#
# # Tes données sous forme de listes
# x = np.array(list_1) # axe X
# y = np.array(list_2) # axe Y
# z = np.array(data) # valeur à afficher (performance future)
# # print(len(list_2), len(list_2), len(data))
# # print(f"Min/max H1: {min(list_1):.5f}, {max(list_1):.5f}")
# # print(f"Min/max 1D: {min(list_2):.5f}, {max(list_2):.5f}")
# # print(f"Min/max Data: {min(data):.5f}, {max(data):.5f}")
# # Fusionner X et Y comme variables indépendantes
# XY = np.column_stack((x, y))
# # Modèle
# model = LinearRegression()
# model.fit(XY, z)
# # Coefficients
# a, b = model.coef_
# c = model.intercept_
# r_squared = model.score(XY, z)
# print(f"Coefficient de détermination R² : {r_squared:.4f}")
# print(f"Équation estimée : Z = {a:.4f} * X + {b:.4f} * Y + {c:.4f}")
# degree = 2 # Pour inclure X², Y², XY
# poly_model = make_pipeline(PolynomialFeatures(degree), LinearRegression())
# poly_model.fit(XY, z)
#
# # Pour afficher les coefficients :
# linreg = poly_model.named_steps['linearregression']
# print("Coefficients:", linreg.coef_)
# print("Intercept:", linreg.intercept_)
#
#
# # Données factices
# # x = np.random.uniform(-2, 2, 500)
# # y = np.random.uniform(-2, 2, 500)
# # z = np.sin(x) * np.cos(y) * 10 # variation factice
#
# # Discrétisation (binning)
# xbins = np.linspace(min(x), max(x), 20)
# ybins = np.linspace(min(y), max(y), 20)
#
# # Création des bins 2D
# H, xedges, yedges = np.histogram2d(x, y, bins=[xbins, ybins], weights=z)
# counts, _, _ = np.histogram2d(x, y, bins=[xbins, ybins]) # pour normaliser
#
# # Moyenne dans chaque bin (évite division par 0)
# H_avg = np.divide(H, counts, out=np.zeros_like(H), where=counts != 0)
#
# # Préparer coordonnées pour le graphique
# xpos, ypos = np.meshgrid(xedges[:-1], yedges[:-1], indexing="ij")
# xpos = xpos.ravel()
# ypos = ypos.ravel()
# zpos = np.zeros_like(xpos)
#
# dx = dy = (xedges[1] - xedges[0]) * 0.9
# dz = H_avg.ravel()
#
# # Affichage
# fig = plt.figure(figsize=(12, 8))
# ax = fig.add_subplot(111, projection='3d')
# colors = plt.cm.RdYlGn((dz - dz.min()) / (dz.max() - dz.min() + 1e-5)) # Normalisation
#
# ax.bar3d(xpos, ypos, zpos, dx, dy, dz, color=colors, shade=True)
#
# ax.set_xlabel(f"{key_1}")
# ax.set_ylabel(f"{key_2}")
# ax.set_zlabel('Perf. moyenne sur 48 bougies')
# ax.set_title('Performance 48 bougies (%)')
# plt.show()
# plt.figure(figsize=(10, 8))
# scatter = plt.scatter(
# list_1,
# list_2,
# c=data, # La couleur selon la performance future
# cmap='RdYlGn', # Dégradé rouge -> jaune -> vert
# alpha=0.8,
# edgecolors='k'
# )
# plt.xlabel(f"{key_1}")
# plt.ylabel(f"{key_2}")
# plt.title(f"Performance future")
# plt.colorbar(scatter, label="Performance 48 bougies (%)")
# plt.grid(True)
# plt.show()
# plt.figure(figsize=(10, 6))
# plt.scatter(list_1, data, c='blue', alpha=0.6)
# plt.xlabel("SMA5 % sur 1 jour")
# plt.ylabel("Variation du prix après 48 bougies (%)")
# plt.title("Lien entre variation SMA5 1j et performance 48h")
# plt.grid(True)
# plt.show()
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
return dataframe
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs): # ne rien faire si ordre deja en cours
if trade.has_open_orders:
#print("has open orders : true")
return None
if (self.wallets.get_available_stake_amount() < 50): # or trade.stake_amount >= max_stake:
#print("wallet too low")
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_3 = dataframe.iloc[-4].squeeze()
# prépare les données
count_of_buys = trade.nr_of_successful_entries
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
if (len(dataframe) < 1):
#print("dataframe empty")
return None
pair = trade.pair
if pair not in ('BTC/USDC', 'XRP/USDC', 'BTC/USDT', 'XRP/USDT'):
print(f"{pair} not in allowed pairs list")
return None
max_buys = 20
# filled_buys = trade.select_filled_orders('buy')
# count_of_buys = len(filled_buys)
if count_of_buys >= max_buys:
#print(f"count_of_buys {count_of_buys} > {max_buys} max buys")
return None
# if 'buy' in last_candle:
# condition = (last_candle['buy'] == 1)
# else:
# condition = False
# self.protection_nb_buy_lost.value
# limit = last_candle['limit']
stake_amount = self.config['stake_amount'] + 50 * self.fibo[count_of_buys]
# current_time_utc = current_time.astimezone(timezone.utc)
# open_date = trade.open_date.astimezone(timezone.utc)
# days_since_open = (current_time_utc - open_date).days
pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4)
# if (days_since_open > count_of_buys) & (0 < count_of_buys <= max_buys) & (current_rate <= limit) & (last_candle['enter_long'] == 1):
if (
(
last_candle['enter_long'] == 1)
or (last_candle['percent48'] < - 0.03 and last_candle['rsi_diff_1h'] > -5)
) \
and (pct_max < -0.012 - (count_of_buys * 0.001)):
try:
# This then calculates current safety order size
# stake_amount = stake_amount * pow(1.5, count_of_buys)
# print(
# f"Adjust {current_time} price={trade.pair} rate={current_rate:.4f} buys={count_of_buys} limit={limit:.4f} stake={stake_amount:.4f}")
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
except Exception as exception:
print(exception)
return None
pcte=-0.012 - (count_of_buys * 0.001)
if not self.dp.runmode.value in ('backtest', 'hyperopt'):
logger.error(f"adjust_trade_position {trade.pair} tag={last_candle['enter_long']} pct48={last_candle['percent48']:.1f} pctmax={pct_max:.4f} pcte={pcte:.4f}")
return None
def adjust_stake_amount(self, pair: str, dataframe: DataFrame):
# Calculer le minimum des 14 derniers jours
current_price = dataframe['close']
# trade = self.getTrade(pair)
# if trade:
# current_price = trade.open_rate
base_stake_amount = self.config['stake_amount'] #.get('stake_amount', 50) # Montant de base configuré
# Calculer le max des 14 derniers jours
min_14_days_4 = dataframe['lowest_4_1d']
max_14_days_4 = dataframe['highest_4_1d']
percent_4 = 1 - (current_price - min_14_days_4) / (max_14_days_4 - min_14_days_4)
factor_4 = 1 / ((current_price - min_14_days_4) / (max_14_days_4 - min_14_days_4))
max_min_4 = max_14_days_4 / min_14_days_4
# min_14_days = dataframe['lowest_1d']
# max_14_days = dataframe['highest_1d']
# percent = 1 - (current_price - min_14_days) / (max_14_days - min_14_days)
# factor = 1 / ((current_price - min_14_days) / (max_14_days - min_14_days))
# max_min = max_14_days / min_14_days
# Stack amount ajusté price=2473.47 min_max=0.15058074985054215 percent=0.8379141364642171 amount=20.0
adjusted_stake_amount = max(base_stake_amount, min(100, base_stake_amount * percent_4))
# if pair in ('BTC/USDT', 'ETH/USDT'):
# if percent_4 > 0.5:
# adjusted_stake_amount = 300
# adjusted_stake_amount_2 = max(base_stake_amount / 2.5, min(75, base_stake_amount * percent))
# print(
# f"Stack amount ajusté price={current_price} max_min={max_min_4:.4f} min_14={min_14_days_4:.4f} max_14={max_14_days_4:.4f} factor={factor_4:.4f} percent={percent_4:.4f} amount={adjusted_stake_amount:.4f}")
# print(f"Stack amount ajusté price={current_price} max_min={max_min:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} factor={factor:.4f} percent={percent:.4f} amount={adjusted_stake_amount_2:.4f}")
return adjusted_stake_amount
# def adjust_exit_price(self, dataframe: DataFrame):
# # Calculer le max des 14 derniers jours
# min_14_days = dataframe['lowest_1d']
# max_14_days = dataframe['highest_1d']
# entry_price = dataframe['fbp']
# current_price = dataframe['close']
# percent = 0.5 * (max_14_days - min_14_days) / min_14_days
# exit_price = (1 + percent) * entry_price
#
# print(f"Exit price ajusté price={current_price:.4f} max_14={max_14_days:.4f} exit_price={exit_price:.4f}")
#
# return exit_price
# def adjust_stoploss(self, pair: str, trade: 'Trade', current_time: datetime,
# current_rate: float, current_profit: float, **kwargs) -> float:
# dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
# # print(dataframe)
# last_candle = dataframe.iloc[-1].squeeze()
#
# # Utiliser l'ATR pour ajuster le stoploss
# atr_stoploss = current_rate - (last_candle['atr'] * 1.5) # Stoploss à 1.5x l'ATR
#
# # Retourner le stoploss dynamique en pourcentage du prix actuel
# return (atr_stoploss / current_rate) - 1
def expectedProfit(self, pair: str, last_candle):
current_price = last_candle['last_price'] # dataframe['close']
# trade = self.getTrade(pair)
# if trade:
# current_price = trade.open_rate
# Calculer le max des 14 derniers jours
min_14_days = last_candle['lowest_1d']
max_14_days = last_candle['highest_1d']
percent = (max_14_days - current_price) / (min_14_days)
min_max = last_candle['pct_min_max_1d'] # (max_14_days - min_14_days) / min_14_days
expected_profit = min(0.1, max(0.01, last_candle['min_max200'] * 0.5 + self.pairs[pair]['count_of_buys'] * 0.0005))
return expected_profit
# def adjust_exit_price(self, dataframe: DataFrame):
# # Calculer le max des 14 derniers jours
# min_14_days = dataframe['lowest_1d']
# max_14_days = dataframe['highest_1d']
# entry_price = dataframe['fbp']
# current_price = dataframe['close']
# percent = 0.5 * (max_14_days - min_14_days) / min_14_days
# exit_price = (1 + percent) * entry_price
#
# print(f"Exit price ajusté price={current_price} max_14={max_14_days} exit_price={exit_price}")
#
# return exit_price
# def adjust_entry_price(self, dataframe: DataFrame):
# # Calculer le max des 14 derniers jours
# min_14_days = dataframe['lowest_1d']
# max_14_days = dataframe['highest_1d']
# current_price = dataframe['close']
# percent = 0.5 * (max_14_days - min_14_days) / min_14_days
# entry_price = (1 + percent) * entry_price
#
# print(f"Entry price ajusté price={current_price} max_14={max_14_days} exit_price={entry_price}")
#
# return entry_price
# def adjust_stake_amount(self, dataframe: DataFrame):
# # Calculer le minimum des 14 derniers jours
# middle = dataframe['middle_1d']
#
# # Récupérer la dernière cotation actuelle (peut être le dernier point de la série)
# current_price = dataframe['close']
#
# # Calculer l'écart entre la cotation actuelle et le minimum des 14 derniers jours
# difference = middle - current_price
# # Ajuster la stake_amount en fonction de l'écart
# # Par exemple, augmenter la stake_amount proportionnellement à l'écart
# base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré
#
# multiplier = 1 - (difference / current_price) # Exemple de logique d'ajustement
#
# adjusted_stake_amount = max(base_stake_amount / 2.5, base_stake_amount * multiplier)
#
# # difference = 346.07000000000016
# # price = 2641.75
# # min_14 = 2295.68
# # amount = 56.5500141951358
#
# print(f"Stack amount ajusté difference={difference} price={current_price} middle={middle} multiplier={multiplier} amount={adjusted_stake_amount}")
#
# return adjusted_stake_amount
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values