Files
Freqtrade/Zeus_8_3_2_B_4_2.py
2025-05-12 22:35:16 +02:00

1539 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
from pandas import DataFrame
from typing import Optional, Union, Tuple
import logging
import configparser
from technical import pivots_points
# --------------------------------
# Add your lib to import here test git
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
import requests
from datetime import timezone, timedelta
logger = logging.getLogger(__name__)
from tabulate import tabulate
def pprint_df(dframe):
print(tabulate(dframe, headers='keys', tablefmt='psql', showindex=False))
def normalize(df):
df = (df - df.min()) / (df.max() - df.min())
return df
class Zeus_8_3_2_B_4_2(IStrategy):
levels = [1, 2, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
startup_candle_count = 24
# ROI table:
minimal_roi = {
"0": 0.564,
"567": 0.273,
"2814": 0.12,
"7675": 0
}
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = True
# Buy hypers
timeframe = '5m'
max_open_trades = 5
max_amount = 40
# DCA config
position_adjustment_enable = True
plot_config = {
"main_plot": {
"sma5_1h": {
"color": "white"
},
"sma5_1d": {
"color": "blue"
},
"sma20": {
"color": "yellow"
},
"sma20_smooth": {
"color": "green"
},
"sma20_smooth_2": {
"color": "red",
},
"sma20_smooth_3": {
"color": "blue",
},
"bb_lowerband": {
"color": "#da59a6"},
"bb_upperband": {
"color": "#da59a6",
},
"sma10": {
"color": "blue"
},
"min12_1d": {
"color": "red"
},
"max12_1d": {
"color": 'red'
},
"min50": {
"color": 'green'
},
"max50": {
"color": 'green'
}
},
"subplots": {
"Pct": {
"sma20_pct": {
'color': "green"
},
"down_pct": {
"color": "blue"
},
"down_pct_1h": {
"color": "red"
},
"down_pct_1d": {
"color": "red"
}
},
"Rsi": {
"rsi": {
"color": "pink"
},
"rsi_1h": {
"color": "red"
},
"rsi_1d": {
"color": "blue"
}
},
"Rsi_diff": {
"rsi_diff_1h": {
"color": "red"
},
"rsi_diff_1d": {
"color": "blue"
},
},
"Down": {
"down_count_1h": {
"color": "green"
},
"up_count_1h": {
"color": "blue"
}
},
# "Diff": {
# "sma10_diff": {
# "color": "#74effc"
# }
# },
"smooth": {
'sma5_diff_sum_1h': {
"color": "green"
},
'sma5_diff2_sum_1h': {
"color": "blue"
},
'mid_smooth_deriv1_1d': {
"color": "blue"
},
'mid_smooth_deriv1_1h': {
"color": "red"
},
'mid_smooth_deriv2_1d': {
"color": "pink"
},
'mid_smooth_deriv2_1h': {
"color": "#da59a6"
}
}
}
}
columns_logged = False
pairs = {
pair: {
"first_buy": 0,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
"last_buy": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
"last_candle": {},
"last_trade": None,
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
# 20 20 40 60 100 160 260 420
# 50 50 100 300 500
# fibo = [1, 1, 2, 3, 5, 8, 13, 21]
# my fibo
# 50 50 50 100 100 150 200 250 350 450 600 1050
fibo = [1, 1, 1, 2, 2, 3, 4, 5, 7, 9, 12, 16, 21]
baisse = [1, 2, 3, 5, 7, 10, 14, 19, 26, 35, 47, 63, 84]
# Ma suite 1 1 1 2 2 3 4 5 7 9 12 16 21
# Mise 50 50 50 100 100 150 200 250 350 450 600 800 1050
# Somme Mises 50 100 150 250 350 500 700 950 1300 1750 2350 3150 4200
# baisse 1 2 3 5 7 10 14 19 26 35 47 63 84
trades = list()
max_profit_pairs = {}
protection_percent_buy_lost = IntParameter(1, 10, default=5, space='protection')
protection_fibo = IntParameter(1, 10, default=2, space='protection')
sell_allow_decrease = DecimalParameter(0.005, 0.02, default=0.2, decimals=2, space='sell', optimize=True, load=True)
def min_max_scaling(self, series: pd.Series) -> pd.Series:
"""Normaliser les données en les ramenant entre 0 et 100."""
return 100 * (series - series.min()) / (series.max() - series.min())
def z_score_scaling(self, series: pd.Series) -> pd.Series:
"""Normaliser les données en utilisant Z-Score Scaling."""
return (series - series.mean()) / series.std()
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
# count_buys = 0
# trade = self.getTrade(pair)
# if trade:
# filled_buys = trade.select_filled_orders('buy')
# count_buys = len(filled_buys)
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round((current_time - self.pairs[pair]['last_date']).total_seconds() / 60,0)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
# last_candle_12 = dataframe.iloc[-13].squeeze()
# if (last_candle['close'] < self.pairs[pair]['last_sell'] * 0.99 or minutes > 60 * 5) & (self.pairs[pair]['stop']):
# print(f"restart {pair} last_sell={self.pairs[pair]['last_sell'] * 0.99} minutes={minutes}")
# self.pairs[pair]['stop'] = False
# allow_to_buy = True #(not self.stop_all) #& (not self.all_down)
allow_to_buy = not self.pairs[pair]['stop'] #not last_candle['tendency'] in ('B-', 'B--') # (rate <= float(limit)) | (entry_tag == 'force_entry')
self.trades = list()
dispo = round(self.wallets.get_available_stake_amount())
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
allow_to_sell = (last_candle['percent'] < 0)
minutes = round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0)
if allow_to_sell:
self.trades = list()
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries #self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['max_profit'] = 0
self.trades = list()
dispo= round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Sell " + str(round(minutes,0)),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2)
)
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
return (allow_to_sell) | (exit_reason == 'force_exit')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
current_candle = dataframe.iloc[-1].squeeze()
adjusted_stake_amount = self.adjust_stake_amount(pair, current_candle)
# print(f"{pair} adjusted_stake_amount{adjusted_stake_amount}")
# Use default stake amount.
return adjusted_stake_amount
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_1h = dataframe.iloc[-13].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
expected_profit = self.expectedProfit(pair, last_candle)
# print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
max_touch_before = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['haclose'], self.pairs[pair]['last_max'])
count_of_buys = trade.nr_of_successful_entries
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = current_profit
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], current_profit)
pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
# if (last_candle['close'] > last_candle['max3_1d']) and \
# (current_profit >= expected_profit or (last_candle['rsi'] >= 80 and current_profit > 0)) & (last_candle['percent'] < 0.0) \
# and ((last_candle['rsi'] >= 75) or before_last_candle['rsi'] >= 75)\
# and (count_of_buys <= 2):
# self.trades = list()
# # self.pairs[pair]['stop'] = True
# return 'rsi_' + str(count_of_buys)
if (last_candle['tendency'] in ('H++', 'H+')) and (last_candle['rsi'] < 80):
# and (last_candle['tendency_1h'] in ('H++', 'H+')):
# and (last_candle['tendency_1d'] in ('H++', 'H+')) :
return None
# if (count_of_buys >= 4 and last_candle['sma5_diff_1h'] > 0):
# return None
# if (last_candle['rsi_1d'] > 50) & (last_candle['percent12'] < 0.0):
baisse = self.pairs[pair]['max_profit'] - current_profit
mx = self.pairs[pair]['max_profit'] / 5
if (baisse > mx) & (current_profit > expected_profit): #last_candle['min_max200'] / 3):
self.trades = list()
return 'mx_' + str(count_of_buys)
if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit):
self.trades = list()
return 'pft_' + str(count_of_buys)
# if (last_candle['percent3'] < -0.002) & (last_candle['percent12'] < 0) & (
# current_profit > last_candle['min_max200'] / 3):
# self.trades = list()
# return 'mnmx_' + str(count_of_buys)
# if (last_candle['percent12'] <= -0.01) & (current_profit >= expected_profit):
# self.trades = list()
# return 'profit_' + str(count_of_buys)
self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch'])
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
from typing import List
def multi_step_interpolate(self, pct: float, thresholds: List[float], factors: List[float]) -> float:
if pct <= thresholds[0]:
return factors[0]
if pct >= thresholds[-1]:
return factors[-1]
for i in range(1, len(thresholds)):
if pct <= thresholds[i]:
# interpolation linéaire entre thresholds[i-1] et thresholds[i]
return factors[i - 1] + (pct - thresholds[i - 1]) * (factors[i] - factors[i - 1]) / (
thresholds[i] - thresholds[i - 1])
# Juste au cas où (devrait jamais arriver)
return factors[-1]
def interpolate_factor(self, pct: float, start_pct: float = 5, end_pct: float = 30,
start_factor: float = 1.0, end_factor: float = 2.0) -> float:
if pct <= start_pct:
return start_factor
if pct >= end_pct:
return end_factor
# interpolation linéaire
return start_factor + (pct - start_pct) * (end_factor - start_factor) / (end_pct - start_pct)
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt':
return
if self.columns_logged % 30 == 0:
# print(
# f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
# )
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7} |{'Buys':>4}| {'Stake':>5} |"
f"Tdc|Tdh|Tdd|Tdc|Tdh|Tdd|"
)
self.printLineLog()
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
# int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = round((last_candle['haclose'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
max_touch = '' #round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1)
if trade_type is not None:
if np.isnan(last_candle['rsi_1d']):
string = ' '
else:
string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_diff_1d']))
trade_type = trade_type \
+ " " + string \
+ " " + str(int(last_candle['rsi_1h'])) \
+ " " + str(int(last_candle['rsi_diff_1h']))
self.printLog(
f"| {date:<16} | {action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"| {profit or '-':>8} | {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {round(self.pairs[pair]['last_max'], 0) or '-':>7} |{buys or '-':>4}|{stake or '-':>7}"
f"|{round(last_candle['sma5_diff_sum_1h'], 2) or '-':>6}|{round(last_candle['sma5_diff_sum_1d'], 2) or '-':>6}"
f"|{last_candle['tendency'] or '-':>3}|{last_candle['tendency_1h'] or '-':>3}|{last_candle['tendency_1d'] or '-':>3}"
f"|{round(last_candle['mid_smooth_deriv1']) or '-':>3}|{round(last_candle['mid_smooth_deriv1_1h']) or '-':>5}|{round(last_candle['mid_smooth_deriv1_1d']) or '-' :>5}"
# f"|{round(last_candle['mid_smooth_deriv2']) or '-' :>3 }|{round(last_candle['mid_smooth_deriv2_1h']) or '-':>5}|{round(last_candle['mid_smooth_deriv2_1d']) or '-':>5}"
)
def printLineLog(self):
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 10}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}+{'-' * 4}+{'-' * 7}+"
)
def printLog(self, str):
if not self.dp.runmode.value in ('backtest', 'hyperopt'):
logger.info(str)
else:
print(str)
def add_tendency_column(self, dataframe: pd.DataFrame) -> pd.DataFrame:
def tag_by_derivatives(row):
d1 = row['mid_smooth_deriv1']
d2 = row['mid_smooth_deriv2']
d1_lim_inf = -0.01
d1_lim_sup = 0.01
if d1 >= d1_lim_inf and d1 <= d1_lim_sup: # and d2 >= d2_lim_inf and d2 <= d2_lim_sup:
return 'P' # Palier
if d1 == 0.0:
return 'DH' if d2 > 0 else 'DB' #Depart Hausse / Départ Baisse
if d1 > d1_lim_sup:
return 'H++' if d2 > 0 else 'H+' #Acceleration Hausse / Ralentissement Hausse
if d1 < d1_lim_inf:
return 'B--' if d2 < 0 else 'B-' # Accéleration Baisse / Ralentissement Baisse
return 'Mid'
dataframe['tendency'] = dataframe.apply(tag_by_derivatives, axis=1)
return dataframe
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['pct_change'] = dataframe['close'].pct_change(5)
dataframe = self.calculateTendency(dataframe)
dataframe['min'] = talib.MIN(dataframe['close'], timeperiod=200)
dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
dataframe['min50'] = talib.MIN(dataframe['close'], timeperiod=50)
dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200)
dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200)
dataframe['max50'] = talib.MAX(dataframe['close'], timeperiod=50)
dataframe['max144'] = talib.MAX(dataframe['close'], timeperiod=144)
dataframe['min_max50'] = (dataframe['max50'] - dataframe['min50']) / dataframe['min50']
dataframe['min_max200'] = (dataframe['max200'] - dataframe['min200']) / dataframe['min200']
dataframe['max200_diff'] = (dataframe['max200'] - dataframe['close']) / dataframe['close']
dataframe['max50_diff'] = (dataframe['max50'] - dataframe['close']) / dataframe['close']
dataframe['sma5'] = talib.SMA(dataframe, timeperiod=5)
dataframe['sma10'] = talib.SMA(dataframe, timeperiod=10)
dataframe['sma10_diff'] = 100 * dataframe['sma10'].diff() / dataframe['sma10']
dataframe['sma20'] = talib.SMA(dataframe, timeperiod=20)
dataframe['sma20_pct'] = 100 * dataframe['sma20'].diff() / dataframe['sma20']
dataframe['sma20_smooth'] = dataframe['sma20'].ewm(span=5).mean()
dataframe['sma20_smooth_2'] = dataframe['sma20'].rolling(window=5, center=True).median()
dataframe['sma20_smooth_3'] = self.smooth_series(dataframe['sma20'], alpha_low=0.05, alpha_high=0.3, threshold=0.2)
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
dataframe["percent3"] = (dataframe["close"] - dataframe["open"].shift(3)) / dataframe["open"].shift(3)
dataframe["percent5"] = (dataframe["close"] - dataframe["open"].shift(5)) / dataframe["open"].shift(5)
dataframe["percent12"] = (dataframe["close"] - dataframe["open"].shift(12)) / dataframe["open"].shift(12)
dataframe["percent24"] = (dataframe["close"] - dataframe["open"].shift(24)) / dataframe["open"].shift(24)
dataframe["percent48"] = (dataframe["close"] - dataframe["open"].shift(48)) / dataframe["open"].shift(48)
dataframe["percent_max_144"] = (dataframe["close"] - dataframe["max144"]) / dataframe["close"]
# print(metadata['pair'])
dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14)
dataframe['rsi_diff'] = dataframe['rsi'].diff()
dataframe['rsi_diff_2'] = dataframe['rsi_diff'].diff()
dataframe['rsi60'] = talib.RSI(dataframe['close'], timeperiod=60)
dataframe['rsi60_diff'] = dataframe['rsi60'].rolling(60).mean().diff()
dataframe['rsi60_diff2'] = dataframe['rsi60_diff'].rolling(60).mean().diff()
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe["bb_percent"] = (
(dataframe["close"] - dataframe["bb_lowerband"]) /
(dataframe["bb_upperband"] - dataframe["bb_lowerband"])
)
dataframe["bb_width"] = (
(dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_upperband"]
)
# Normalization
dataframe['average_line'] = dataframe['close'].mean()
dataframe['average_line_50'] = talib.MIDPOINT(dataframe['close'], timeperiod=50)
dataframe['average_line_288'] = talib.MIDPOINT(dataframe['close'], timeperiod=288)
dataframe['average_line_288_098'] = dataframe['average_line_288'] * 0.98
dataframe['average_line_288_099'] = dataframe['average_line_288'] * 0.99
# # Sort the close prices to find the 4 lowest values
# sorted_close_prices = dataframe['close'].rolling(576).sort_values()
# lowest_4 = sorted_close_prices.head(20)
#
# dataframe['lowest_4_average'] = lowest_4.mean()
# # Propagate this mean value across the entire dataframe
# # dataframe['lowest_4_average'] = dataframe['lowest_4_average'].iloc[0]
#
# # # Sort the close prices to find the 4 highest values
# sorted_close_prices = dataframe['close'].rolling(288).sort_values(ascending=False)
# highest_4 = sorted_close_prices.head(20)
#
# # # Calculate the mean of the 4 highest values
# dataframe['highest_4_average'] = highest_4.mean()
# Compter les baisses consécutives
self.calculateDownAndUp(dataframe, limit=0.0001)
# dataframe = self.apply_regression_derivatives(dataframe, column='mid', window=24, degree=3)
# Normaliser les données de 'close'
# normalized_close = self.min_max_scaling(dataframe['close'])
################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
heikinashi = qtpylib.heikinashi(informative)
informative['haopen'] = heikinashi['open']
informative['haclose'] = heikinashi['close']
informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose']
informative = self.calculateTendency(informative, 3)
# informative = self.apply_regression_derivatives(informative, column='mid', window=5, degree=3)
informative['volatility'] = talib.STDDEV(informative['close'], timeperiod=14) / informative['close']
informative['atr'] = (talib.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / informative['close']
informative['rsi'] = talib.RSI(informative['close']) #, timeperiod=7)
informative['rsi_diff'] = informative['rsi'].diff()
informative['rsi_sum'] = (informative['rsi'].rolling(7).sum() - 350) / 7
informative['rsi_sum_diff'] = informative['rsi_sum'].diff()
informative['rsi_diff_2'] = informative['rsi_diff'].diff()
informative['max12'] = talib.MAX(informative['close'], timeperiod=12)
informative['min12'] = talib.MIN(informative['close'], timeperiod=12)
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma5_diff'] = 100 * informative['sma5'].diff() / informative['sma5']
informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5']
informative['sma5_diff_sum'] = (informative['sma5_pct'].rolling(5).sum()) / 5
informative['sma5_diff2_sum'] = informative['sma5_diff_sum'].diff()
self.calculateDownAndUp(informative, limit=0.0012)
# if self.dp.runmode.value in ('backtest'):
# self.test_signal_success(informative, percent=0.01, window_size=24)
# if self.dp.runmode.value in ('backtest'):
# condition = (informative['sma5'].shift(2) > informative['sma5'].shift(1)) \
# & (informative['sma5'].shift(1) < informative['sma5']) \
# & (informative['down_pct'].shift(3) < -0.015)
#
# self.test_signal_success(informative, condition, percent=0.01, window_size=3)
# self.test_signal_success(informative, condition, percent=0.01, window_size=5)
# self.test_signal_success(informative, condition, percent=0.01, window_size=10)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
informative = self.calculateTendency(informative, 3)
informative['volatility'] = talib.STDDEV(informative['close'], timeperiod=14) / informative['close']
informative['atr'] = (talib.ATR(informative['high'], informative['low'], informative['close'], timeperiod=14)) / informative['close']
# informative = self.apply_regression_derivatives(informative, column='mid', window=5, degree=3)
informative['max12'] = talib.MAX(informative['close'], timeperiod=12)
informative['min12'] = talib.MIN(informative['close'], timeperiod=12)
informative['max3'] = talib.MAX(informative['close'], timeperiod=3)
informative['min3'] = talib.MIN(informative['close'], timeperiod=3)
informative['rsi'] = talib.RSI(informative['close']) #, timeperiod=7)
informative['rsi_diff'] = informative['rsi'].diff()
informative['rsi_sum'] = (informative['rsi'].rolling(7).sum() - 350) / 7
informative['rsi_diff_2'] = informative['rsi_diff'].diff()
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma5_pct'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5']
informative['sma5_diff_sum'] = (informative['sma5_pct'].rolling(5).sum()) / 5
informative['sma5_diff2_sum'] = informative['sma5_diff_sum'].diff()
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
# dataframe['close01'] = dataframe.iloc[-1]['close'] * 1.01
# dataframe['limit'] = dataframe['close']
count_buys = 0
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
print(trade)
filled_buys = trade.select_filled_orders('buy')
count = 0
amount = 0
for buy in filled_buys:
if count == 0:
dataframe['first_price'] = buy.price
# dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
print(buy)
count = count + 1
amount += buy.price * buy.filled
# dataframe['mid_price'] = (dataframe['last_price'] + dataframe['first_price']) / 2
count_buys = count
# dataframe['limit'] = dataframe['last_price'] * (1 - self.baisse[count] / 100)
# dataframe['amount'] = amount
print(f"amount= {amount}")
# # trades = Trade.get_trades([Trade.is_open is False]).all()
# trades = Trade.get_trades_proxy(is_open=False, pair=metadata['pair'])
# if trades:
# trade = trades[-1]
# print('closed trade pair is : ')
# print(trade)
# dataframe['expected_profit'] = (1 + self.expectedProfit(pair, dataframe.iloc[-1])) * dataframe[
# 'last_price']
# dataframe['lbp'] = dataframe['last_price']
# dataframe['lbp_3'] = dataframe['lbp'] * 0.97 # 3
# dataframe['lbp_6'] = dataframe['lbp'] * 0.94 # 6
# dataframe['lbp_9'] = dataframe['lbp'] * 0.90 # 10
# dataframe['lbp_12'] = dataframe['lbp'] * 0.85 # 15
# dataframe['lbp_20'] = dataframe['lbp'] * 0.8 # 20
# dataframe['fbp'] = trade.open_rate
# # else:
# # last_trade = self.get_trades(pair=pair).order_by('-close_date').first()
# # filled_buys = last_trade.select_filled_orders('buy')
# # print(last_trade)
# # for buy in filled_buys:
# # print(filled_buys)
# dataframe['buy_level'] = dataframe['lowest_4_average'] * (1 - self.levels[count_buys] / 100)
# ----------------------------------------------------------
# Calcul de la variation entre deux bougies successives
# dataframe['price_change'] = dataframe['close'].diff()
# # Marquer les bougies en baisse
# dataframe['is_down'] = dataframe['price_change'] < 0
#
# # Identifier les blocs consécutifs de baisses
# # dataframe['drop_id'] = (dataframe['is_down'] != dataframe['is_down'].shift(1)).cumsum()
# dataframe['drop_id'] = np.where(dataframe['is_down'],
# (dataframe['is_down'] != dataframe['is_down'].shift(12)).cumsum(), np.nan)
#
# # Identifier uniquement les blocs de baisse
# dataframe['drop_id'] = dataframe['drop_id'].where(dataframe['is_down'])
# # Grouper par les chutes détectées
# drop_info = dataframe.groupby('drop_id').agg(
# start=('close', 'first'), # Prix au début de la chute
# end=('close', 'last'), # Prix à la fin de la chute
# start_index=('close', 'idxmin'), # Début de la chute (index)
# end_index=('close', 'idxmax'), # Fin de la chute (index)
# )
#
# # Calcul de l'ampleur de la chute en %
# drop_info['drop_amplitude_pct'] = ((drop_info['end'] - drop_info['start']) / drop_info['start']) * 100
# # Filtrer les chutes avec une amplitude supérieure à 3%
# drop_info = drop_info[drop_info['drop_amplitude_pct'] < -3]
# **************
# Identifier le prix de début et de fin de chaque chute
# drop_stats = dataframe.groupby('drop_id').agg(
# start_price=('close', 'first'), # Prix au début de la chute
# end_price=('close', 'last'), # Prix à la fin de la chute
# )
# Calculer l'amplitude en %
# drop_stats['amplitude_pct'] = ((drop_stats['end_price'] - drop_stats['start_price']) / drop_stats[
# 'start_price']) * 100
# # drop_stats = drop_stats[drop_stats['amplitude_pct'] < -1]
# # Associer les amplitudes calculées à chaque drop_id dans dataframe
# dataframe = dataframe.merge(drop_stats[['amplitude_pct']], on='drop_id', how='left')
# # Remplir les lignes sans drop_id par 0
# dataframe['amplitude_pct'] = dataframe['amplitude_pct'].fillna(0)
# dataframe['amplitude_pct_60'] = dataframe['amplitude_pct'].rolling(60).sum()
# ----------------------------------------------------------
# self.getBinanceOrderBook(pair, dataframe)
# if self.dp.runmode.value in ('backtest'):
# self.test_signal_success(dataframe, 0.005)
dataframe['futur_price_1h'] = dataframe['close'].shift(-12)
dataframe['futur_price_2h'] = dataframe['close'].shift(-24)
dataframe['futur_price_3h'] = dataframe['close'].shift(-36)
return dataframe
def calculateDownAndUp(self, dataframe, limit=0.0001):
dataframe['down'] = dataframe['hapercent'] <= limit
dataframe['up'] = dataframe['hapercent'] >= limit
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
def calculateTendency(self, dataframe, window=12):
dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2
# 2. Calcul du lissage sur 200 bougies par moyenne mobile médiane
dataframe['mid_smooth'] = dataframe['close'].rolling(window=window, center=True, min_periods=1).median().rolling(
3).mean()
# 2. Dérivée première = différence entre deux bougies successives
dataframe['mid_smooth_deriv1'] = round(100 * dataframe['mid_smooth'].diff() / dataframe['mid_smooth'], 4)
# 3. Dérivée seconde = différence de la dérivée première
dataframe['mid_smooth_deriv2'] = round(100 * dataframe['mid_smooth_deriv1'].diff().rolling(3).mean(), 4)
dataframe = self.add_tendency_column(dataframe)
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
print('search open trades')
self.trades = Trade.get_open_trades()
return self.trades
# def getTrade(self, pair):
# trades = self.getOpenTrades()
# trade_for_pair = None
# for trade in trades:
# if trade.pair == pair:
# trade_for_pair = trade
# break
# return trade_for_pair
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
# self.getOpenTrades()
expected_profit = self.expectedProfit(pair, dataframe.iloc[-1])
# self.getBinanceOrderBook(pair, dataframe)
last_candle = dataframe.iloc[-1].squeeze()
print("---------------" + pair + "----------------")
print('adjust stake amount ' + str(self.adjust_stake_amount(pair, dataframe.iloc[-1])))
# print('adjust exit price ' + str(self.adjust_exit_price(dataframe.iloc[-1])))
print('calcul expected_profit ' + str(expected_profit))
buy_level = dataframe['average_line_50'] #dataframe['buy_level'] # self.get_buy_level(pair, dataframe)
dataframe.loc[
(
(dataframe['max200_diff'] >= 0.01)
& (dataframe['percent12'] < -0.002)
# & (dataframe['pct_change'] < 0)
& (dataframe['open'] < dataframe['average_line_288_099'])
& (dataframe['open'] < dataframe['average_line_50'])
# & (dataframe['percent'] >= -0.0005)
& (dataframe['min12'].shift(2) == dataframe['min12'])
& (dataframe['up_count'] > 0)
& (dataframe["bb_width"] > 0.01)
), ['enter_long', 'enter_tag']] = (1, 'mx200')
dataframe.loc[
(
(
(dataframe['percent12'] < -0.015) |
(dataframe['percent24'] < -0.022) |
(dataframe['percent48'] < -0.030)
)
& (dataframe['close'] <= dataframe['min50'] * 1.002)
& (dataframe['open'] < dataframe['average_line_50'])
& (
(dataframe['close'] < dataframe['min12'] * 1.002)
| (dataframe['percent12'] < -0.022)
| (dataframe['percent24'] < -0.022)
)
& (
(dataframe['min50'].shift(2) == dataframe['min50'])
| (dataframe['percent12'] < -0.022)
| (dataframe['percent24'] < -0.022)
)
& (dataframe['up_count'] > 0)
& (dataframe["bb_width"] > 0.01)
), ['enter_long', 'enter_tag']] = (1, 'pct12')
dataframe.loc[
(
(dataframe['close'] <= dataframe['min200'] * 1.002)
& (dataframe["bb_width"] > 0.01)
& (dataframe['min_max200'] > 0.015)
# & (dataframe['pct_change'] < 0)
& (dataframe['haopen'] < buy_level)
& (dataframe['open'] < dataframe['average_line_288'])
& (dataframe['up_count'] > 0)
), ['enter_long', 'enter_tag']] = (1, 'mnmx200')
# dataframe.loc[
# (
# (dataframe['close'].shift(2) <= dataframe['min200'])
# & (dataframe['pct_change'] < 0)
# & (dataframe['min200'].shift(2) == dataframe['min200'])
# & (dataframe['close'] < dataframe['lowest_4_average'])
# & (dataframe['up_count'] > 0)
# ), ['enter_long', 'enter_tag']] = (1, 'min200')
dataframe.loc[
(
# (dataframe['rsi_1h'] < 70)
# & (dataframe['rsi_diff_1h'] > -5)
# (dataframe["bb_width"] > 0.01)
(dataframe['down_count'].shift(1) < - 1)
& (dataframe['down_count'] == 0)
& (dataframe['mid_smooth_deriv1'] >= -0.01)
# & (dataframe['tendency'] != "B--")
# & (dataframe['tendency'] != "B-")
), ['enter_long', 'enter_tag']] = (1, 'down')
dataframe.loc[
(
(dataframe['low'] < dataframe['min200'])
& (dataframe['min50'] == dataframe['min50'].shift(3))
#
& (dataframe['tendency'] != "B-")
), ['enter_long', 'enter_tag']] = (1, 'low')
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.01, np.nan)
if self.dp.runmode.value in ('backtest'):
dataframe.to_feather(f"user_data/data/binance/{metadata['pair'].replace('/', '_')}_df.feather")
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
return dataframe
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
print("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 50): # or trade.stake_amount >= max_stake:
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_3 = dataframe.iloc[-4].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
if (len(dataframe) < 1):
print("skip dataframe")
return None
pair = trade.pair
if pair not in ('BTC/USDT', 'BTC/USDC', 'XRP/USDT', 'XRP/USDC', 'ETH/USDT', 'ETH/USDC'):
print(f"skip pair {pair}")
return None
count_of_buys = trade.nr_of_successful_entries
# if 'buy' in last_candle:
# condition = (last_candle['buy'] == 1)
# else:
# condition = False
# self.protection_nb_buy_lost.value
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pct_first = 0
if self.pairs[pair]['first_buy']:
pct_first = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
pct = 0.012
if count_of_buys == 1:
pct_max = current_profit
else:
if self.pairs[trade.pair]['last_buy']:
pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 4)
else:
pct_max = - pct
lim = - pct - (count_of_buys * 0.001)
# print(f"{trade.pair} current_profit={current_profit} count_of_buys={count_of_buys} pct_max={pct_max:.3f} lim={lim:.3f} rsi_diff_1f={last_candle['rsi_diff_1h']}")
# if (days_since_open > count_of_buys) & (0 < count_of_buys <= max_buys) & (current_rate <= limit) & (last_candle['enter_long'] == 1):
limit_buy = 20
if (count_of_buys < limit_buy) \
and ((last_candle['enter_long'] == 1)
or (last_candle['percent48'] < - 0.03)
or ((last_candle['min50'] == last_candle_3['min50']) and (last_candle['low'] <= last_candle['min50']))
) \
and (last_candle['rsi_diff_1h'] >= -5) \
and (last_candle['tendency'] in ('P', 'H++', 'DH', 'H+')) \
and (last_candle['sma5_diff_sum_1d'] > -1 or count_of_buys <= 9) \
and (pct_max < lim):
try:
# and (last_candle['mid_smooth_deriv1_1d'] > -1000 or last_candle['mid_smooth_deriv1_1h'] > 200) \
# and (last_candle['mid_smooth_deriv1_1d'] > -1500) \
# and not (last_candle['mid_smooth_deriv1_1d'] < - 500 and last_candle['mid_smooth_deriv1_1h'] < 0) \
max_amount = self.config.get('stake_amount', 100) * 2.5
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle) - 10 * pct_first / pct) # min(200, self.adjust_stake_amount(pair, last_candle) * self.fibo[count_of_buys])
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
except Exception as exception:
print(exception)
return None
return None
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# Calculer le minimum des 14 derniers jours
base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré
first_price = self.pairs[pair]['first_buy']
if (first_price == 0):
first_price = last_candle['close']
last_max = last_candle['max12_1d']
# if self.pairs[pair]['last_max'] == 0:
# self.pairs[pair]['last_max'] = last_candle['max12_1d']
# print(f"last_max set to {last_max}")
#
# if self.pairs[pair]['last_max'] > 0:
# last_max = self.pairs[pair]['last_max']
# print(f"last_max is {last_max}")
pct = 5
if last_max > 0:
pct = 100 * (last_max - first_price) / last_max
thresholds = [2, 5, 10, 20]
factors = [1, 1.25, 1.5, 2.0]
factor = self.multi_step_interpolate(pct, thresholds, factors)
adjusted_stake_amount = base_stake_amount * factor #max(base_stake_amount, min(100, base_stake_amount * percent_4))
return adjusted_stake_amount
# def adjust_exit_price(self, dataframe: DataFrame):
# # Calculer le max des 14 derniers jours
# min_14_days = dataframe['lowest_1d']
# max_14_days = dataframe['highest_1d']
# entry_price = dataframe['fbp']
# current_price = dataframe['close']
# percent = 0.5 * (max_14_days - min_14_days) / min_14_days
# exit_price = (1 + percent) * entry_price
#
# print(f"Exit price ajusté price={current_price:.4f} max_14={max_14_days:.4f} exit_price={exit_price:.4f}")
#
# return exit_price
# def adjust_stoploss(self, pair: str, trade: 'Trade', current_time: datetime,
# current_rate: float, current_profit: float, **kwargs) -> float:
# dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
# # print(dataframe)
# last_candle = dataframe.iloc[-1].squeeze()
#
# # Utiliser l'ATR pour ajuster le stoploss
# atr_stoploss = current_rate - (last_candle['atr'] * 1.5) # Stoploss à 1.5x l'ATR
#
# # Retourner le stoploss dynamique en pourcentage du prix actuel
# return (atr_stoploss / current_rate) - 1
def expectedProfit(self, pair: str, last_candle: DataFrame):
# first_price = last_candle['first_price']
# first_max = 0.01
# if first_price < last_candle['max200']:
# first_max = (last_candle['max200'] - first_price) / first_price
expected_profit = 0.004 #min(0.01, first_max)
# print(
# f"Expected profit price={current_price:.4f} min_max={min_max:.4f} min_14={min_14_days:.4f} max_14={max_14_days:.4f} percent={percent:.4f} expected_profit={expected_profit:.4f}")
# self.analyze_conditions(pair, dataframe)
return expected_profit
# def adjust_exit_price(self, dataframe: DataFrame):
# # Calculer le max des 14 derniers jours
# min_14_days = dataframe['lowest_1d']
# max_14_days = dataframe['highest_1d']
# entry_price = dataframe['fbp']
# current_price = dataframe['close']
# percent = 0.5 * (max_14_days - min_14_days) / min_14_days
# exit_price = (1 + percent) * entry_price
#
# print(f"Exit price ajusté price={current_price} max_14={max_14_days} exit_price={exit_price}")
#
# return exit_price
# def adjust_entry_price(self, dataframe: DataFrame):
# # Calculer le max des 14 derniers jours
# min_14_days = dataframe['lowest_1d']
# max_14_days = dataframe['highest_1d']
# current_price = dataframe['close']
# percent = 0.5 * (max_14_days - min_14_days) / min_14_days
# entry_price = (1 + percent) * entry_price
#
# print(f"Entry price ajusté price={current_price} max_14={max_14_days} exit_price={entry_price}")
#
# return entry_price
# def adjust_stake_amount(self, dataframe: DataFrame):
# # Calculer le minimum des 14 derniers jours
# middle = dataframe['middle_1d']
#
# # Récupérer la dernière cotation actuelle (peut être le dernier point de la série)
# current_price = dataframe['close']
#
# # Calculer l'écart entre la cotation actuelle et le minimum des 14 derniers jours
# difference = middle - current_price
# # Ajuster la stake_amount en fonction de l'écart
# # Par exemple, augmenter la stake_amount proportionnellement à l'écart
# base_stake_amount = self.config.get('stake_amount', 100) # Montant de base configuré
#
# multiplier = 1 - (difference / current_price) # Exemple de logique d'ajustement
#
# adjusted_stake_amount = max(base_stake_amount / 2.5, base_stake_amount * multiplier)
#
# # difference = 346.07000000000016
# # price = 2641.75
# # min_14 = 2295.68
# # amount = 56.5500141951358
#
# print(f"Stack amount ajusté difference={difference} price={current_price} middle={middle} multiplier={multiplier} amount={adjusted_stake_amount}")
#
# return adjusted_stake_amount
def getBinanceOrderBook(self, pair, dataframe: DataFrame):
"""Fetch the order book (depth) from Binance."""
# print(dataframe)
last_candle = dataframe.iloc[-1].squeeze()
symbol = pair.replace('/', '')
try:
url = f"https://api.binance.com/api/v3/depth?symbol={symbol}&limit=5000"
response = requests.get(url)
data = response.json()
# Extract bids and asks from the order book
asks, bids = self.calculateSMA(20, data['asks'], data['bids']) # Ventes List of [price, quantity]
# bids = data['bids']
# asks = data['asks'] # Achats List of [price, quantity]
# Process the depth data as you need it
# bid_volume = sum([float(bid[1]) for bid in bids]) # Sum of all bid volumes
# $ * nb / $ => nb
bid_volume = sum([float(bid[0]) * float(bid[1]) / float(last_candle['close']) for bid in bids[:10]])
# ask_volume = sum([float(ask[1]) for ask in asks]) # Sum of all ask volumes
ask_volume = sum([float(ask[0]) * float(ask[1]) / float(last_candle['close']) for ask in asks[:10]])
# Example: add the difference in volumes as an indicator
if bid_volume and ask_volume:
self.updateLastValue(dataframe, 'depth_bid_ask_diff', round(bid_volume - ask_volume, 2))
else:
self.updateLastValue(dataframe, 'depth_bid_ask_diff', 0)
# probabilité baisse hausse sur les n premiers élements
for start in [0, 50, 100, 150]:
self.updateLastValue(dataframe, 'prob_hausse_' + str(start + 50),
self.calculateProbaNb(asks, bids, start, start + 50))
# dataframe['prob_hausse_' + str(nb)] = self.calculateProbaNb(asks, bids, nb)
# Analyse des prix moyens pondérés par les volumes (VWAP) :
#
# Le VWAP (Volume Weighted Average Price) peut être utilisé pour comprendre la pression directionnelle.
# Si le VWAP basé sur les ordres d'achat est plus élevé que celui des ordres de vente,
# cela peut indiquer une probabilité de hausse.
nb = 50
bid_vwap = sum([float(bid[0]) * float(bid[1]) for bid in bids[:nb]]) / sum(
[float(bid[1]) for bid in bids[:nb]])
ask_vwap = sum([float(ask[0]) * float(ask[1]) for ask in asks[:nb]]) / sum(
[float(ask[1]) for ask in asks[:nb]])
if bid_vwap > ask_vwap:
self.updateLastValue(dataframe, 'vwap_hausse',
round(100 * (bid_vwap - ask_vwap) / (bid_vwap + ask_vwap), 2))
else:
self.updateLastValue(dataframe, 'vwap_hausse',
- round(100 * (ask_vwap - bid_vwap) / (bid_vwap + ask_vwap), 2))
current_price = last_candle['close'] # le prix actuel du marché
# Calcul du seuil de variation de 1%
lower_threshold = current_price * 0.99
upper_threshold = current_price * 1.01
# Volumes d'achat (bids) sous 1% du prix actuel
bid_volume_1percent = sum(
[float(bid[1]) for bid in bids if current_price >= float(bid[0]) >= lower_threshold])
# Volumes de vente (asks) au-dessus de 1% du prix actuel
ask_volume_1percent = sum(
[float(ask[1]) for ask in asks if current_price <= float(ask[0]) <= upper_threshold])
# Estimation de la probabilité basée sur le déséquilibre des volumes
total_volume = bid_volume_1percent + ask_volume_1percent
if total_volume > 0:
prob_hausse = bid_volume_1percent / total_volume
prob_baisse = ask_volume_1percent / total_volume
else:
prob_hausse = prob_baisse = 0
self.updateLastValue(dataframe, 'proba_hausse_1%', round(prob_hausse * 100, 2))
self.updateLastValue(dataframe, 'proba_baisse_1%', round(prob_baisse * 100, 2))
print(f"Probabilité de hausse de 1%: {prob_hausse * 100:.2f}%")
print(f"Probabilité de baisse de 1%: {prob_baisse * 100:.2f}%")
self.calculateResistance(pair, asks, dataframe)
self.calculateSupport(pair, bids, dataframe)
dataframe['r_s'] = 100 * (dataframe['r_min'] - dataframe['s_min']) / dataframe['s_min']
except Exception as e:
logger.error(f"Error fetching order book: {e}")
return None, None
def calculateProbaNb(self, asks, bids, start, nb):
top_bids = sum([float(bid[1]) for bid in bids[start:nb]])
top_asks = sum([float(ask[1]) for ask in asks[start:nb]])
if top_bids > top_asks:
proba = round(100 * (top_bids - top_asks) / (top_bids + top_asks), 2)
else:
proba = - round(100 * (top_asks - top_bids) / (top_bids + top_asks), 2)
return proba
def calculateResistance(self, pair, asks, dataframe: DataFrame):
last_candle = dataframe.iloc[-1].squeeze()
# Filtrage +-5%
current_price = float(last_candle['close'])
lower_bound = current_price * 0.95
upper_bound = current_price * 1.05
ask_prices = [float(ask[0]) for ask in asks]
ask_volumes = [float(ask[1]) for ask in asks]
ask_df = pd.DataFrame({'price': ask_prices, 'volume': ask_volumes})
filtered_ask_df = ask_df[(ask_df['price'] >= lower_bound) & (ask_df['price'] <= upper_bound)]
# Trier le DataFrame sur la colonne 'volume' en ordre décroissant
sorted_ask_df = filtered_ask_df.sort_values(by='volume', ascending=False)
# Ne garder que les 3 premières lignes (les 3 plus gros volumes)
top_3_asks = sorted_ask_df.head(3)
print(top_3_asks)
# Convertir les ordres de vente en numpy array pour faciliter le traitement
asks_array = np.array(filtered_ask_df, dtype=float)
# Détecter les résistances : on peut définir qu'une résistance est un niveau de prix où la quantité est élevée
# Ex: seuil de résistance à partir des 10% des plus grosses quantités
resistance_threshold = np.percentile(asks_array[:, 1], 90)
resistances = asks_array[asks_array[:, 1] >= resistance_threshold]
# Afficher les résistances trouvées
# print(f"{pair} Niveaux de résistance détectés:")
# for resistance in resistances:
# print(f"{pair} Prix: {resistance[0]}, Quantité: {resistance[1]}")
# Exemple : somme des quantités sur des plages de prix
# Intervalles de 10 USDT
step = last_candle['close'] / 100
price_intervals = np.arange(asks_array[:, 0].min(), asks_array[:, 0].max(), step=step)
for start_price in price_intervals:
end_price = start_price + step
mask = (asks_array[:, 0] >= start_price) & (asks_array[:, 0] < end_price)
volume_in_range = asks_array[mask, 1].sum()
amount = volume_in_range * end_price
print(
f"Prix entre {start_price:.6f} et {end_price:.6f}: Volume total = {volume_in_range:.2f} amount={amount:.2f}")
# Trier les asks par quantité en ordre décroissant
asks_sorted = asks_array[asks_array[:, 1].argsort()][::-1]
# Sélectionner les trois plus gros resistances
top_3_resistances = asks_sorted[:3]
# Afficher les trois plus gros resistances
print("Les trois plus grosses resistances détectées : ")
self.updateLastValue(dataframe, 'r3', top_3_resistances[0][0])
self.updateLastValue(dataframe, 'r2', top_3_resistances[1][0])
self.updateLastValue(dataframe, 'r1', top_3_resistances[2][0])
self.updateLastValue(dataframe, 'r_min',
min(top_3_resistances[0][0], top_3_resistances[1][0], top_3_resistances[2][0]))
for resistance in top_3_resistances:
print(f"{pair} Prix: {resistance[0]}, Quantité: {resistance[1]}")
def calculateSupport(self, pair, bids, dataframe: DataFrame):
last_candle = dataframe.iloc[-1].squeeze()
# Convert to pandas DataFrame to apply moving average
current_price = float(last_candle['close'])
lower_bound = current_price * 0.95
upper_bound = current_price * 1.05
bid_prices = [float(bid[0]) for bid in bids]
bid_volumes = [float(bid[1]) for bid in bids]
bid_df = pd.DataFrame({'price': bid_prices, 'volume': bid_volumes})
filtered_bid_df = bid_df[(bid_df['price'] >= lower_bound) & (bid_df['price'] <= upper_bound)]
# Trier le DataFrame sur la colonne 'volume' en ordre décroissant
sorted_bid_df = filtered_bid_df.sort_values(by='volume', ascending=False)
# Ne garder que les 3 premières lignes (les 3 plus gros volumes)
top_3_bids = sorted_bid_df.head(3)
print(top_3_bids)
# Convertir les ordres d'achat en numpy array pour faciliter le traitement
bids_array = np.array(filtered_bid_df, dtype=float)
# Détecter les supports : on peut définir qu'un support est un niveau de prix où la quantité est élevée
# Ex: seuil de support à partir des 10% des plus grosses quantités
support_threshold = np.percentile(bids_array[:, 1], 90)
supports = bids_array[bids_array[:, 1] >= support_threshold]
# Afficher les supports trouvés
# print(f"{pair} Niveaux de support détectés:")
# for support in supports:
# print(f"{pair} Prix: {support[0]}, Quantité: {support[1]}")
step = last_candle['close'] / 100
# Exemple : somme des quantités sur des plages de prix pour les bids
price_intervals = np.arange(bids_array[:, 0].min(), bids_array[:, 0].max(), step=step) # Intervalles de 10 USDT
for start_price in price_intervals:
end_price = start_price + step
mask = (bids_array[:, 0] >= start_price) & (bids_array[:, 0] < end_price)
volume_in_range = bids_array[mask, 1].sum()
amount = volume_in_range * end_price
print(
f"Prix entre {start_price:.6f} et {end_price:.6f}: Volume total = {volume_in_range:.2f} amount={amount:.2f}")
# Trier les bids par quantité en ordre décroissant
bids_sorted = bids_array[bids_array[:, 1].argsort()][::-1]
# Sélectionner les trois plus gros supports
top_3_supports = bids_sorted[:3]
# Afficher les trois plus gros supports
print("Les trois plus gros supports détectés:")
self.updateLastValue(dataframe, 's1', top_3_supports[0][0])
self.updateLastValue(dataframe, 's2', top_3_supports[1][0])
self.updateLastValue(dataframe, 's3', top_3_supports[2][0])
self.updateLastValue(dataframe, 's_min', max(top_3_supports[0][0], top_3_supports[1][0], top_3_supports[2][0]))
for support in top_3_supports:
print(f"{pair} Prix: {support[0]}, Quantité: {support[1]}")
def updateLastValue(self, df: DataFrame, col, value):
if col in df.columns:
print(f"update last col {col} {value}")
df.iloc[-1, df.columns.get_loc(col)] = value
else:
print(f"update all col {col} {value}")
df[col] = value
def smooth_series(self, series, alpha_low=0.1, alpha_high=0.5, threshold=0.2):
"""
Applique un lissage adaptatif sur une série Pandas.
- alpha_low : lissage fort (forte inertie, pour petites variations)
- alpha_high : lissage faible (rapide réaction, pour grandes variations)
- threshold : variation (%) à partir de laquelle on considère le mouvement significatif
"""
smoothed = [series.iloc[0]]
for i in range(1, len(series)):
prev = smoothed[-1]
current = series.iloc[i]
variation = abs(current - prev) / prev * 100 if prev != 0 else 0
alpha = alpha_high if variation > threshold else alpha_low
new_val = prev + alpha * (current - prev)
smoothed.append(new_val)
return pd.Series(smoothed, index=series.index)
# def update_last_record(self, dataframe: DataFrame, new_data):
# # Vérifiez si de nouvelles données ont été reçues
# if new_data is not None:
# # Ne mettez à jour que la dernière ligne du dataframe
# last_index = dataframe.index[-1] # Sélectionne le dernier enregistrement
# dataframe.loc[last_index] = new_data # Met à jour le dernier enregistrement avec les nouvelles données
# return dataframe
def calculateSMA(self, nb, asks, bids):
# Prepare data for plotting
bid_prices = [float(bid[0]) for bid in bids]
bid_volumes = [float(bid[1]) for bid in bids]
ask_prices = [float(ask[0]) for ask in asks]
ask_volumes = [float(ask[1]) for ask in asks]
# Convert to pandas DataFrame to apply moving average
bid_df = pd.DataFrame({'price': bid_prices, 'volume': bid_volumes})
ask_df = pd.DataFrame({'price': ask_prices, 'volume': ask_volumes})
# Apply a rolling window to calculate a 10-value simple moving average (SMA)
bid_df['volume_sma'] = bid_df['volume'].rolling(window=nb).mean()
ask_df['volume_sma'] = ask_df['volume'].rolling(window=nb).mean()
# Pour bid_df
bid_df = bid_df.dropna(subset=['volume_sma'])
bids_with_sma = list(zip(bid_df['price'], bid_df['volume_sma']))
# Pour ask_df
ask_df = ask_df.dropna(subset=['volume_sma'])
asks_with_sma = list(zip(ask_df['price'], ask_df['volume_sma']))
# print(bids_with_sma)
# print(asks_with_sma)
return asks_with_sma, bids_with_sma
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values
# ✅ Première dérivée(variation ou pente)
# Positive: la courbe est croissante → tendance haussière.
# Négative: la courbe est décroissante → tendance baissière.
# Proche de 0: la courbe est plate → marché stable ou en transition.
#
# Applications:
# Détecter les points dinflexion(changement de tendance) quand elle sannule.\
# Analyser la vitesse dun mouvement(plus elle est forte, plus le mouvement est impulsif).
#
# ✅ Seconde dérivée(accélération ou concavité)
# Positive: la pente augmente → accélération de la hausse ou ralentissement de la baisse.
# Négative: la pente diminue → accélération de la baisse ou ralentissement de la hausse.
# Changement de signe: indique souvent un changement de courbure, utile pour prévoir des retournements.
#
# Exemples:
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
#
# Filtrer les signaux: ne prendre un signal haussier que si dérivée1 > 0 et dérivée2 > 0.
# Détecter les zones de retournement: quand dérivée1 ≈ 0 et que dérivée2 change de signe.
def apply_regression_derivatives(self,
dataframe: DataFrame,
column: str = 'close',
window: int = 50,
degree: int = 3,
future_offset: int = 10 # projection à n bougies après
) -> DataFrame:
df = dataframe.copy()
regression_fit = []
deriv1 = []
deriv2 = []
regression_future_fit = []
regression_future_deriv1 = []
regression_future_deriv2 = []
for i in range(len(df)):
if i < window or i + future_offset >= len(df):
regression_fit.append(np.nan)
deriv1.append(np.nan)
deriv2.append(np.nan)
regression_future_fit.append(np.nan)
regression_future_deriv1.append(np.nan)
regression_future_deriv2.append(np.nan)
continue
y = df[column].iloc[i - window:i].values
x = np.arange(window)
coeffs = np.polyfit(x, y, degree)
poly = np.poly1d(coeffs)
x_now = window - 1
x_future = x_now + future_offset
regression_fit.append(poly(x_now))
deriv1.append(np.polyder(poly, 1)(x_now))
deriv2.append(np.polyder(poly, 2)(x_now))
regression_future_fit.append(poly(x_future))
regression_future_deriv1.append(np.polyder(poly, 1)(x_future))
regression_future_deriv2.append(np.polyder(poly, 2)(x_future))
df['regression_fit'] = regression_fit
df['regression_deriv1'] = deriv1
df['regression_deriv2'] = deriv2
df['regression_future_fit'] = regression_future_fit
df['regression_future_deriv1'] = regression_future_deriv1
df['regression_future_deriv2'] = regression_future_deriv2
return df
def test_signal_success(self, df, condition, percent=0.03, window_size=36):
"""
df : DataFrame avec colonnes ['close', 'high', ...]
percent : hausse recherchée (ex: 0.03 pour +3%)
window_size : nombre de bougies (ex: 36 pour 3h en 5m)
"""
# Exemple condition : RSI < 30 et EMA20 > SMA50
hits = 0
total = 0
for idx in df[condition].index:
price_now = df.loc[idx, 'close']
idx_pos = df.index.get_loc(idx)
# Fenêtre de h heures
future_idx = df.index[idx_pos + 1: idx_pos + 1 + window_size]
if len(future_idx) < window_size:
continue
future_highs = df.loc[future_idx, 'close']
if (future_highs >= price_now * (1 + percent)).any():
# print(f"{price_now} ==> {df.loc[future_idx]['close']}")
hits += 1
total += 1
prob = hits / total if total > 0 else 0
print(f"{hits}/{total} hausses >= {percent*100:.1f}% dans {window_size} bougies → probabilité : {prob:.2%}")
return prob