Rebase HeikinAshi.py

This commit is contained in:
Jérôme Delacotte
2025-05-05 23:37:09 +02:00
parent 014996f0ab
commit 014289c2a0

View File

@@ -72,6 +72,9 @@ class HeikinAshi(IStrategy):
"Percent": {
"hapercent": {
"color": "#74effc"
},
"percent12": {
'color': 'blue'
}
},
'up_down': {
@@ -126,10 +129,11 @@ class HeikinAshi(IStrategy):
# trailing_stop_positive_offset = 0.015
# trailing_only_offset_is_reached = True
position_adjustment_enable = True
dca = {}
pairs = {
pair: {
"first_buy": 0,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
@@ -177,16 +181,18 @@ class HeikinAshi(IStrategy):
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 3)
max_touch = self.pairs[trade.pair]['max_touch']
limit_buy = 5
if (count_of_buys < limit_buy) \
and ((last_candle['enter_long'] == 1)) \
and (current_profit < -0.015 * count_of_buys):
# limit_buy = 5
# and (count_of_buys < limit_buy) \
if (hours > 2) \
and (last_candle['sma5_diff_1h'] > -0.3) \
and (last_candle['sma5_diff_1d'] > -1) \
and (last_candle['enter_long'] == 1) \
and (pct_max < -0.015):
# and (last_candle_decalage['min12'] == last_candle['min12']) \
# and (last_candle_decalage['close'] < last_candle_decalage['mid288']):
additional_stake = self.calculate_stake(trade.pair, last_candle, 1) # self.config['stake_amount']
additional_stake = self.calculate_stake(trade.pair, last_candle, current_profit) # self.config['stake_amount']
self.log_trade(
last_candle=last_candle,
date=current_time,
@@ -197,38 +203,43 @@ class HeikinAshi(IStrategy):
trade_type=last_candle['enter_tag'],
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(additional_stake, 2)
stake=round(additional_stake, 2),
trade=trade
)
self.expectedProfit(trade.pair, last_candle, current_rate)
self.expectedProfit(trade.pair, last_candle)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return additional_stake
pct_limit = (-0.015 * limit_buy) + (- 0.03 * (count_of_buys - limit_buy))
if (count_of_buys >= limit_buy) and (current_profit < pct_limit) \
and ((last_candle['enter_long'] == 1) or
(last_candle['percent48'] < - 0.03 and last_candle['min200'] == last_candle_3['min200'])):
additional_stake = self.calculate_stake(trade.pair, last_candle, 1) * (-current_profit / 0.1)
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=last_candle['enter_tag'],
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(additional_stake, 2)
)
self.expectedProfit(trade.pair, last_candle, current_rate)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return additional_stake
# pct_limit = (-0.015 * limit_buy) + (- 0.03 * (count_of_buys - limit_buy))
# if (hours > 6) and (count_of_buys >= limit_buy) \
# and (last_candle['sma5_diff_1d'] > -1.5) \
# and (last_candle['sma5_diff_1h'] > -0.3) \
# and (pct_max < pct_limit or (hours > 24 and last_candle['close'] < self.pairs[trade.pair]['last_buy'])) \
# and ((last_candle['enter_long'] == 1) or
# (last_candle['percent48'] < - 0.03 and last_candle['min200'] == last_candle_3['min200'])):
# additional_stake = self.calculate_stake(trade.pair, last_candle, current_profit) #* (-current_profit / 0.1)
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="Loss -",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type=last_candle['enter_tag'],
# profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
# buys=trade.nr_of_successful_entries + 1,
# stake=round(additional_stake, 2),
# trade=trade
# )
# self.expectedProfit(trade.pair, last_candle)
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
#
# return additional_stake
# if (current_profit > 0) and (count_of_buys >= limit_buy) and (hours > 24) and (last_candle['enter_long'] == 1):
# additional_stake = self.calculate_stake(trade.pair, last_candle, 1)
@@ -244,7 +255,7 @@ class HeikinAshi(IStrategy):
# buys=trade.nr_of_successful_entries + 1,
# stake=round(additional_stake, 2)
# )
# self.expectedProfit(trade.pair, last_candle, current_rate)
# self.expectedProfit(trade.pair, last_candle)
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
@@ -259,30 +270,64 @@ class HeikinAshi(IStrategy):
# Obtenir les données actuelles pour cette paire
last_candle = dataframe.iloc[-1].squeeze()
stake_amount = self.config['stake_amount']
m = max(last_candle['max12_1d'], current_rate)
if last_candle['max12_1d'] > 0:
if (last_candle['close'] < m * 0.90):
stake_amount = stake_amount * 3
return self.calculate_stake(pair, last_candle, 0)
def calculate_stake(self, pair, last_candle, current_profit=0.0):
pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1)
# print(pct_max)
if current_profit == 0:
m = max(last_candle['max12_1d'], last_candle['close'])
stake_amount = self.config['stake_amount']
if last_candle['close'] > round(last_candle['max12_1d'], 1):
stake_amount = stake_amount / 2
else:
if (last_candle['close'] < m * 0.95):
stake_amount = stake_amount * 2
if last_candle['min12_1d'] > 0:
if last_candle['close'] < last_candle['min12_1d']:
stake_amount = stake_amount * 2
else:
if last_candle['close'] < last_candle['min12_1d'] + (last_candle['max12_1d'] - last_candle['min12_1d']) / 2 :
stake_amount = stake_amount * 1.5
self.pairs[pair]['base_buy'] = stake_amount
else:
stake_amount = self.pairs[pair]['base_buy']
return stake_amount
if pct_max < 0:
stake_amount = stake_amount * ((1 - pct_max * 3) - pct_max / 0.1)
def calculate_stake(self, pair, last_candle, factor=1):
stake_amount = self.config['stake_amount']
m = max(last_candle['max12_1d'], last_candle['close'])
if last_candle['max12_1d'] > 0:
if (last_candle['close'] < m * 0.90):
stake_amount = stake_amount * 3
else:
if (last_candle['close'] < m * 0.95):
stake_amount = stake_amount * 2
return min(self.wallets.get_available_stake_amount(), stake_amount)
return stake_amount
# def dca_strategy(self, base_price = 10000, drop_percent=20, steps=7, method="linear"):
#
# """
# Calcule la répartition des mises en fonction du pourcentage de baisse et du nombre de paliers.
#
# :param total_amount: Montant total à investir (ex: 2000€)
# :param drop_percent: Pourcentage total de baisse (ex: 20%)
# :param steps: Nombre de paliers d'achat (ex: 5)
# :param method: Méthode de répartition: "linear", "exponential", "custom"
# :return: Liste des mises et des prix d'achat
# """
# base_price = 10000 # Prix initial du BTC (modifiable)
# prices = [base_price * (1 - (drop_percent / 100) * (i / (steps - 1))) for i in range(steps)]
# total_amount = self.wallets.get_available_stake_amount()
# if method == "linear":
# amounts = [total_amount / steps] * steps # Montant égal à chaque palier
# elif method == "exponential":
# weights = [2 ** i for i in range(steps)]
# total_weight = sum(weights)
# amounts = [(w / total_weight) * total_amount for w in weights]
# elif method == "custom":
# weights = [1, 2, 3, 4, 5] # Exemple de répartition personnalisée
# total_weight = sum(weights)
# amounts = [(w / total_weight) * total_amount for w in weights]
# else:
# raise ValueError("Méthode invalide. Choisir 'linear', 'exponential' ou 'custom'.")
#
# return {"prices": prices, "amounts": amounts}
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
@@ -290,7 +335,7 @@ class HeikinAshi(IStrategy):
last_candle = dataframe.iloc[-1].squeeze()
dispo = round(self.wallets.get_available_stake_amount())
# if (self.pairs[pair]['last_sell'] > 0) and (last_candle['close'] * 1.01 > self.pairs[pair]['last_sell']):
# if last_candle['max12_1d'] > 0 and last_candle['close'] > last_candle['max5_1d']:
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
@@ -299,15 +344,36 @@ class HeikinAshi(IStrategy):
# rate=rate,
# dispo=dispo,
# profit=0,
# stake=round(stake_amount, 2)
# stake=0
# )
# return False
# Exemple d'utilisation :
# self.dca = self.dca_strategy(base_price=last_candle['close'], method="exponential")
# print(self.dca)
if last_candle['sma5_diff_1d'] < - 0.6:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="CANCEL BUY",
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
stake=0,
trade_type='cancel'
)
return False
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
stake_amount = self.calculate_stake(pair, last_candle, 1)
stake_amount = self.calculate_stake(pair, last_candle, 0)
self.expectedProfit(pair, last_candle)
# if self.pairs[pair]['stop_buy']:
# if last_candle['sma5_diff_1d'] > 0:
@@ -343,7 +409,6 @@ class HeikinAshi(IStrategy):
buys=1,
stake=round(stake_amount, 2)
)
self.expectedProfit(pair, last_candle, rate)
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
@@ -357,6 +422,7 @@ class HeikinAshi(IStrategy):
allow_to_sell = (last_candle['percent5'] < -0.00)
ok = (allow_to_sell) | (exit_reason == 'force_exit')
if ok:
self.pairs[pair]['expected_profit'] = 0
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['last_candle'] = last_candle
@@ -368,7 +434,8 @@ class HeikinAshi(IStrategy):
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2)
profit=round(trade.calc_profit(rate, amount), 2),
trade=trade
)
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
@@ -397,18 +464,21 @@ class HeikinAshi(IStrategy):
factor = 1
# if days > 10:
# factor = 1 + days / 10
expected_profit = self.pairs[pair]['expected_profit'] / factor
expected_profit = self.expectedProfit(pair, last_candle) #self.pairs[pair]['expected_profit'] / factor
# print(
# f"{current_time} days={days} expected={expected_profit:.3f} rate={current_rate} max_touch={max_touch_before:.1f} profit={current_profit:.3f} last_lost={last_lost:.3f} buys={count_of_buys} percent={last_candle['percent']:.4f}")
# f"{current_time} days={days} expected={expected_profit:.3f} rate={current_rate} max_touch={max_touch_before:.1f} profit={current_profit:.3f} last_lost={expected:.3f} buys={count_of_buys} percent={last_candle['percent']:.4f}")
# if count_of_buys >= 5 and current_profit < 0 and (last_candle['percent12'] < -0.015):
# self.pairs[pair]['stop_buy'] = True
# return 'count_' + str(count_of_buys)
if (current_profit > expected_profit) \
& (last_candle['percent5'] < 0.0) \
& (last_lost > - current_profit / 5):
return 'last_lost_' + str(count_of_buys)
if (last_candle['percent3'] < 0.0) & (current_profit > last_candle['min_max200'] / 3):
return 'min_max200_' + str(count_of_buys)
# if (current_profit > expected_profit) \
# & (last_candle['percent5'] < 0.0) \
# & (last_lost > - current_profit / 5):
# return 'lost_' + str(count_of_buys)
self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch'])
# if (current_profit > 0.004) \
@@ -440,20 +510,19 @@ class HeikinAshi(IStrategy):
return df
def expectedProfit(self, pair: str, last_candle, current_rate):
def expectedProfit(self, pair: str, last_candle):
last_buy = self.pairs[pair]['last_buy']
max_touch = self.pairs[pair]['max_touch']
expected_profit = ((max_touch - last_buy) / max_touch)
# last_buy = self.pairs[pair]['last_buy']
# max_touch = self.pairs[pair]['max_touch']
#
expected_profit = min(0.1, max(0.01, last_candle['min_max200'] * 0.5 + self.pairs[pair]['count_of_buys'] * 0.0005))
#((max_touch - last_buy) / max_touch)
self.pairs[pair]['expected_profit'] = max(0.004, expected_profit)
# print(f"expected max_touch={max_touch:.1f} last_buy={last_buy:.1f} expected={expected_profit:.3f} max5_1d={last_candle['max5_1d']:.1f}")
return expected_profit
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
last_candle=None, trade=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt':
return
@@ -462,7 +531,7 @@ class HeikinAshi(IStrategy):
# f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
# )
print(
f"| {'Date':<16} | {'Action':<10} | {'Pair':<10} | {'Trade Type':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>5} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>12} | {'Buys':>5} | {'Stake':>10} |"
f"| {'Date':<16} | {'Action':<10} | {'Pair':<10} | {'Type s1d s1h':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>5} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>12} | {'Buys':>5} | {'Stake':>10} |"
)
print(
f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
@@ -490,16 +559,18 @@ class HeikinAshi(IStrategy):
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = round((last_candle['haclose'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
expected = str(round(self.pairs[pair]['expected_profit'], 3))
max_touch = round(self.pairs[pair]['max_touch'], 1)
pct_max = round(100 * self.pairs[pair]['current_profit'], 1)
max_touch = round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = round(100 * (last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 1) # round(100 * self.pairs[pair]['current_profit'], 1)
if trade_type is not None:
trade_type = trade_type + " " + str(round(100 * self.pairs[pair]['expected_profit'], 1))
trade_type = trade_type \
+ " " + str(round(last_candle['sma5_diff_1d'], 1)) \
+ " " + str(round(last_candle['sma5_diff_1h'], 1))
print(
f"| {date:<16} | {action:<10} | {pair:<10} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {pct_max or '-':>5} | {max_touch or '-':>11} | {last_lost or '-':>12} | {round(self.pairs[pair]['last_max'], 2) or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |"
f"| {date:<16} | {action:<10} | {pair:<10} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {pct_max or '-':>5} | {max_touch or '-':>11} | {expected or '-':>12} | {round(self.pairs[pair]['last_max'], 2) or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |"
)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
@@ -547,8 +618,6 @@ class HeikinAshi(IStrategy):
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
dataframe['down_tag'] = (dataframe['down_count'] < -7)
dataframe['up_tag'] = (dataframe['up_count'] > 7)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
@@ -581,8 +650,29 @@ class HeikinAshi(IStrategy):
dataframe = self.detect_loose_hammer(dataframe)
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma5_diff'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5']
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
self.calculateBearishCandles(dataframe, 200)
self.calculateBearishCandles(dataframe, 48)
return dataframe
def calculateBearishCandles(self, dataframe, lookback_period):
# Trouver l'index du dernier maximum sur 200 bougies
rolling_max_indices = dataframe['high'].rolling(window=lookback_period, min_periods=1).apply(np.argmax,
raw=True)
# Convertir ces indices en index globaux du DataFrame
dataframe['last_max_index'] = dataframe.index - (lookback_period - 1) + rolling_max_indices
# Création d'une colonne pour stocker le nombre de bougies baissières depuis le dernier max
bearish_counts = np.zeros(len(dataframe))
for i in range(lookback_period, len(dataframe)):
last_max_idx = int(dataframe.loc[i, 'last_max_index'])
bearish_counts[i] = (dataframe.loc[last_max_idx:i, 'close'] < dataframe.loc[last_max_idx:i, 'open']).sum()
dataframe['bearish_candles_since_max' + str(lookback_period)] = bearish_counts
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
@@ -627,12 +717,20 @@ class HeikinAshi(IStrategy):
| (dataframe['percent12'] <= -0.012)
)
& (dataframe['down_count'] == 0)
# & (dataframe['bearish_candles_since_max48'] > 20)
,
['enter_long', 'enter_tag']] = [1, 'down']
dataframe.loc[(dataframe['loose_hammer'] == 1)
,
['enter_long', 'enter_tag']] = [1, 'hammer']
# dataframe.loc[
# (
# # (dataframe['hapercent'] > 0)
# (dataframe['down_count'].shift(1) < - 6)
# & (dataframe['down_count'] == 0)
# & (dataframe['down_pct'].shift(1) <= -0.5)
# ), ['enter_long', 'enter_tag']] = (1, 'buy_hapercent')
# dataframe.loc[(dataframe['loose_hammer'] == 1)
# ,
# ['enter_long', 'enter_tag']] = [1, 'hammer']
# dataframe.loc[
# (
# (dataframe['low'] <= dataframe['min200'])
@@ -657,6 +755,6 @@ class HeikinAshi(IStrategy):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
# informative_pairs += [(pair, '1h') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs