Files
Freqtrade/HeikinAshi.py
Jérôme Delacotte 014289c2a0 Rebase HeikinAshi.py
2025-05-05 23:37:59 +02:00

761 lines
34 KiB
Python

# Heracles Strategy: Strongest Son of GodStra
# ( With just 1 Genome! its a bacteria :D )
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT:Add to your pairlists inside config.json (Under StaticPairList):
# {
# "method": "AgeFilter",
# "min_days_listed": 100
# },
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
#
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces roi buy --strategy Heracles
# ######################################################################
# --- Do not remove these libs ---
from freqtrade.persistence import Trade
from typing import Optional, Tuple, Union
from datetime import timezone, timedelta, datetime
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import logging
# noinspection PyUnresolvedReferences
from freqtrade.strategy import (IStrategy, informative)
from pandas import DataFrame
# --------------------------------
# Add your lib to import here
# import talib.abstract as ta
import pandas as pd
import ta
import talib.abstract as talib
from ta.utils import dropna
import freqtrade.vendor.qtpylib.indicators as qtpylib
from functools import reduce
import numpy as np
from scipy.special import binom
from ta.trend import SMAIndicator, EMAIndicator, MACD, ADXIndicator
from ta.momentum import RSIIndicator, StochasticOscillator
class HeikinAshi(IStrategy):
plot_config = {
"main_plot": {
"min288": {
"color": "#197260"
},
'max288': {
'color': 'green'
},
'mid288': {
'color': 'blue'
},
'hasma5': {
'color': 'red'
},
'max48': {
'color': 'yellow'
},
'min48': {
'color': 'yellow'
},
'sma12': {
'color': 'pink'
},
'ema5_1d': {
'color': "#74effc"
},
'ema20_1d': {
'color': "cyan"
},
},
"subplots": {
"Percent": {
"hapercent": {
"color": "#74effc"
},
"percent12": {
'color': 'blue'
}
},
'up_down': {
'up_pct': {
'color': 'red'
},
'down_pct': {
'color': 'blue'
}
},
'tag': {
'rsi_downtrend': {
'color': 'red'
},
'ma_downtrend': {
'color': 'blue'
}
}
}
}
# Buy hyperspace params:
buy_params = {
"buy_crossed_indicator_shift": 9,
"buy_div_max": 0.75,
"buy_div_min": 0.16,
"buy_indicator_shift": 15,
}
# Sell hyperspace params:
sell_params = {
}
# ROI table:
minimal_roi = {
"0": 0.598
}
# Stoploss:
stoploss = -1
# Optimal timeframe use it in your config
timeframe = '5m'
columns_logged = False
max_entry_position_adjustment = 30
startup_candle_count = 288
# Trailing stoploss
# trailing_stop = False
# trailing_stop_positive = 0.001
# trailing_stop_positive_offset = 0.015
# trailing_only_offset_is_reached = True
position_adjustment_enable = True
dca = {}
pairs = {
pair: {
"first_buy": 0,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
"last_buy": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
"last_candle": {},
"last_trade": None,
'base_stake_amount': 0,
'stop_buy': False
}
for pair in ["BTC/USDT", "ETH/USDT", "DOGE/USDT", "DASH/USDT", "XRP/USDT", "SOL/USDT"]
}
decalage = IntParameter(0, 10, default=3, space='buy')
########################################## END RESULT PASTE PLACE #####################################
# ------------------------------------------------------------------------------------------------------------------
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs
) -> Union[Optional[float], Tuple[Optional[float], Optional[str]]]:
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
return None
if (self.wallets.get_available_stake_amount() < 50): # or trade.stake_amount >= max_stake:
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_3 = dataframe.iloc[-4].squeeze()
# last_candle_24 = dataframe.iloc[-25].squeeze()
# if (last_candle['sma5_diff_1d'] < -0.1):
# return None
# prépare les données
count_of_buys = trade.nr_of_successful_entries
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
pct_max = round((last_candle['close'] - self.pairs[trade.pair]['last_buy']) / self.pairs[trade.pair]['last_buy'], 3)
# limit_buy = 5
# and (count_of_buys < limit_buy) \
if (hours > 2) \
and (last_candle['sma5_diff_1h'] > -0.3) \
and (last_candle['sma5_diff_1d'] > -1) \
and (last_candle['enter_long'] == 1) \
and (pct_max < -0.015):
# and (last_candle_decalage['min12'] == last_candle['min12']) \
# and (last_candle_decalage['close'] < last_candle_decalage['mid288']):
additional_stake = self.calculate_stake(trade.pair, last_candle, current_profit) # self.config['stake_amount']
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=last_candle['enter_tag'],
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries + 1,
stake=round(additional_stake, 2),
trade=trade
)
self.expectedProfit(trade.pair, last_candle)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return additional_stake
# pct_limit = (-0.015 * limit_buy) + (- 0.03 * (count_of_buys - limit_buy))
# if (hours > 6) and (count_of_buys >= limit_buy) \
# and (last_candle['sma5_diff_1d'] > -1.5) \
# and (last_candle['sma5_diff_1h'] > -0.3) \
# and (pct_max < pct_limit or (hours > 24 and last_candle['close'] < self.pairs[trade.pair]['last_buy'])) \
# and ((last_candle['enter_long'] == 1) or
# (last_candle['percent48'] < - 0.03 and last_candle['min200'] == last_candle_3['min200'])):
# additional_stake = self.calculate_stake(trade.pair, last_candle, current_profit) #* (-current_profit / 0.1)
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="Loss -",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type=last_candle['enter_tag'],
# profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
# buys=trade.nr_of_successful_entries + 1,
# stake=round(additional_stake, 2),
# trade=trade
# )
# self.expectedProfit(trade.pair, last_candle)
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
#
# return additional_stake
# if (current_profit > 0) and (count_of_buys >= limit_buy) and (hours > 24) and (last_candle['enter_long'] == 1):
# additional_stake = self.calculate_stake(trade.pair, last_candle, 1)
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="Gain +",
# dispo=dispo,
# pair=trade.pair,
# rate=current_rate,
# trade_type=last_candle['enter_tag'],
# profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
# buys=trade.nr_of_successful_entries + 1,
# stake=round(additional_stake, 2)
# )
# self.expectedProfit(trade.pair, last_candle)
# self.pairs[trade.pair]['last_buy'] = current_rate
# self.pairs[trade.pair]['max_touch'] = last_candle['close']
# self.pairs[trade.pair]['last_candle'] = last_candle
#
# return additional_stake
return None
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
# Obtenir les données actuelles pour cette paire
last_candle = dataframe.iloc[-1].squeeze()
return self.calculate_stake(pair, last_candle, 0)
def calculate_stake(self, pair, last_candle, current_profit=0.0):
pct_max = round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3) # round(100 * self.pairs[pair]['current_profit'], 1)
# print(pct_max)
if current_profit == 0:
m = max(last_candle['max12_1d'], last_candle['close'])
stake_amount = self.config['stake_amount']
if last_candle['close'] > round(last_candle['max12_1d'], 1):
stake_amount = stake_amount / 2
else:
if last_candle['min12_1d'] > 0:
if last_candle['close'] < last_candle['min12_1d']:
stake_amount = stake_amount * 2
else:
if last_candle['close'] < last_candle['min12_1d'] + (last_candle['max12_1d'] - last_candle['min12_1d']) / 2 :
stake_amount = stake_amount * 1.5
self.pairs[pair]['base_buy'] = stake_amount
else:
stake_amount = self.pairs[pair]['base_buy']
if pct_max < 0:
stake_amount = stake_amount * ((1 - pct_max * 3) - pct_max / 0.1)
return min(self.wallets.get_available_stake_amount(), stake_amount)
# def dca_strategy(self, base_price = 10000, drop_percent=20, steps=7, method="linear"):
#
# """
# Calcule la répartition des mises en fonction du pourcentage de baisse et du nombre de paliers.
#
# :param total_amount: Montant total à investir (ex: 2000€)
# :param drop_percent: Pourcentage total de baisse (ex: 20%)
# :param steps: Nombre de paliers d'achat (ex: 5)
# :param method: Méthode de répartition: "linear", "exponential", "custom"
# :return: Liste des mises et des prix d'achat
# """
# base_price = 10000 # Prix initial du BTC (modifiable)
# prices = [base_price * (1 - (drop_percent / 100) * (i / (steps - 1))) for i in range(steps)]
# total_amount = self.wallets.get_available_stake_amount()
# if method == "linear":
# amounts = [total_amount / steps] * steps # Montant égal à chaque palier
# elif method == "exponential":
# weights = [2 ** i for i in range(steps)]
# total_weight = sum(weights)
# amounts = [(w / total_weight) * total_amount for w in weights]
# elif method == "custom":
# weights = [1, 2, 3, 4, 5] # Exemple de répartition personnalisée
# total_weight = sum(weights)
# amounts = [(w / total_weight) * total_amount for w in weights]
# else:
# raise ValueError("Méthode invalide. Choisir 'linear', 'exponential' ou 'custom'.")
#
# return {"prices": prices, "amounts": amounts}
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
dispo = round(self.wallets.get_available_stake_amount())
# if last_candle['max12_1d'] > 0 and last_candle['close'] > last_candle['max5_1d']:
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="CANCEL BUY",
# pair=pair,
# rate=rate,
# dispo=dispo,
# profit=0,
# stake=0
# )
# return False
# Exemple d'utilisation :
# self.dca = self.dca_strategy(base_price=last_candle['close'], method="exponential")
# print(self.dca)
if last_candle['sma5_diff_1d'] < - 0.6:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="CANCEL BUY",
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
stake=0,
trade_type='cancel'
)
return False
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
stake_amount = self.calculate_stake(pair, last_candle, 0)
self.expectedProfit(pair, last_candle)
# if self.pairs[pair]['stop_buy']:
# if last_candle['sma5_diff_1d'] > 0:
# self.pairs[pair]['stop_buy'] = False
# else:
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="CANCEL BUY",
# pair=pair,
# rate=rate,
# dispo=dispo,
# profit=0,
# trade_type='stop_buy',
# buys=1,
# stake=0
# )
# return False
# self.columns_logged = False
print(
f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
)
self.log_trade(
last_candle=last_candle,
date=current_time,
action="START BUY",
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs) -> bool:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
dispo = round(self.wallets.get_available_stake_amount())
allow_to_sell = (last_candle['percent5'] < -0.00)
ok = (allow_to_sell) | (exit_reason == 'force_exit')
if ok:
self.pairs[pair]['expected_profit'] = 0
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['last_candle'] = last_candle
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Sell",
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2),
trade=trade
)
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
# print(f"Sell {current_time} {exit_reason} rate={rate:.3f} amount={amount} profit={amount * rate:.3f}")
return ok
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
max_touch_before = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['haclose'], self.pairs[pair]['last_max'])
last_lost = (last_candle['close'] - max_touch_before) / max_touch_before
count_of_buys = trade.nr_of_successful_entries
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = current_profit
days = (current_time - trade.open_date_utc).days
days = max(1, days)
factor = 1
# if days > 10:
# factor = 1 + days / 10
expected_profit = self.expectedProfit(pair, last_candle) #self.pairs[pair]['expected_profit'] / factor
# print(
# f"{current_time} days={days} expected={expected_profit:.3f} rate={current_rate} max_touch={max_touch_before:.1f} profit={current_profit:.3f} last_lost={expected:.3f} buys={count_of_buys} percent={last_candle['percent']:.4f}")
# if count_of_buys >= 5 and current_profit < 0 and (last_candle['percent12'] < -0.015):
# self.pairs[pair]['stop_buy'] = True
# return 'count_' + str(count_of_buys)
if (last_candle['percent3'] < 0.0) & (current_profit > last_candle['min_max200'] / 3):
return 'min_max200_' + str(count_of_buys)
# if (current_profit > expected_profit) \
# & (last_candle['percent5'] < 0.0) \
# & (last_lost > - current_profit / 5):
# return 'lost_' + str(count_of_buys)
self.pairs[pair]['max_touch'] = max(last_candle['haclose'], self.pairs[pair]['max_touch'])
# if (current_profit > 0.004) \
# & (last_candle['hapercent'] < 0.0) \
# & (last_candle['percent3'] < - min(0.01, current_profit / 4)):
# return 'profit_' + str(count_of_buys)
def detect_loose_hammer(self, df: DataFrame, fact=2.5) -> DataFrame:
"""
Détection large de marteaux : accepte des corps plus gros, ne vérifie pas le volume,
ne demande pas de divergence, juste un pattern visuel simple.
"""
body = abs(df['close'] - df['open'])
upper_shadow = abs(df['high'] - np.maximum(df['close'], df['open']))
lower_shadow = abs(np.minimum(df['close'], df['open']) - df['low'])
# Critères simplifiés :
df['loose_hammer'] = (
(lower_shadow > body * fact) # mèche basse > corps
& (upper_shadow < body) # petite mèche haute
& (df['low'] < df['bb_lowerband'])
).astype(int)
df['won_hammer'] = (
(upper_shadow > body * fact) # mèche basse > corps
& (lower_shadow < body) # petite mèche haute
& (df['high'] > df['bb_upperband'])
).astype(int)
return df
def expectedProfit(self, pair: str, last_candle):
# last_buy = self.pairs[pair]['last_buy']
# max_touch = self.pairs[pair]['max_touch']
#
expected_profit = min(0.1, max(0.01, last_candle['min_max200'] * 0.5 + self.pairs[pair]['count_of_buys'] * 0.0005))
#((max_touch - last_buy) / max_touch)
self.pairs[pair]['expected_profit'] = max(0.004, expected_profit)
return expected_profit
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None, trade=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt':
return
if self.columns_logged % 30 == 0:
# print(
# f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
# )
print(
f"| {'Date':<16} | {'Action':<10} | {'Pair':<10} | {'Type s1d s1h':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>5} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>12} | {'Buys':>5} | {'Stake':>10} |"
)
print(
f"|{'-' * 18}+{'-' * 12}+{'-' * 12}+{'-' * 20}+{'-' * 14}+{'-' * 8}+{'-' * 10}+{'-' * 7}+{'-' * 13}+{'-' * 14}+{'-' * 14}+{'-' * 7}+{'-' * 12}|"
)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
# int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
expected = str(round(self.pairs[pair]['expected_profit'], 3))
max_touch = round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = round(100 * (last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 1) # round(100 * self.pairs[pair]['current_profit'], 1)
if trade_type is not None:
trade_type = trade_type \
+ " " + str(round(last_candle['sma5_diff_1d'], 1)) \
+ " " + str(round(last_candle['sma5_diff_1h'], 1))
print(
f"| {date:<16} | {action:<10} | {pair:<10} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {pct_max or '-':>5} | {max_touch or '-':>11} | {expected or '-':>12} | {round(self.pairs[pair]['last_max'], 2) or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |"
)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hamid'] = dataframe['haclose'] + (dataframe['haopen'] - dataframe['haclose']) / 2
dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2
dataframe['sma12'] = dataframe['mid'].rolling(12).sum() / 12
dataframe['hasma5'] = dataframe['hamid'].rolling(5).sum() / 5
dataframe['hasma5_diff'] = dataframe['hasma5'] - dataframe['hasma5'].shift(1)
dataframe['halow'] = heikinashi['low']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12)
dataframe['min48'] = talib.MIN(dataframe['close'], timeperiod=48)
dataframe['max48'] = talib.MAX(dataframe['close'], timeperiod=48)
dataframe['min288'] = talib.MIN(dataframe['close'], timeperiod=288)
dataframe['max288'] = talib.MAX(dataframe['close'], timeperiod=288)
dataframe['mid288'] = dataframe['min288'] + (dataframe['max288'] - dataframe['min288']) / 2
dataframe['min200'] = talib.MIN(dataframe['close'], timeperiod=200)
dataframe['max200'] = talib.MAX(dataframe['close'], timeperiod=200)
dataframe['min_max200'] = (dataframe['max200'] - dataframe['min200']) / dataframe['min200']
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
dataframe["percent3"] = dataframe['close'].pct_change(3)
dataframe["percent5"] = dataframe['close'].pct_change(5)
dataframe["percent12"] = dataframe['close'].pct_change(12)
dataframe["percent48"] = dataframe['close'].pct_change(48)
dataframe['average_line_288'] = talib.MIDPOINT(dataframe['close'], timeperiod=288)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe['bb_diff'] = (dataframe['bb_upperband'] - dataframe['bb_lowerband']) / dataframe['bb_lowerband']
# Compter les baisses consécutives
dataframe['down'] = dataframe['hapercent'] <= 0.001
dataframe['up'] = dataframe['hapercent'] >= -0.001
dataframe['down_count'] = - dataframe['down'].astype(int) * (
dataframe['down'].groupby((dataframe['down'] != dataframe['down'].shift()).cumsum()).cumcount() + 1)
dataframe['up_count'] = dataframe['up'].astype(int) * (
dataframe['up'].groupby((dataframe['up'] != dataframe['up'].shift()).cumsum()).cumcount() + 1)
# Créer une colonne vide
dataframe['down_pct'] = self.calculateUpDownPct(dataframe, 'down_count')
dataframe['up_pct'] = self.calculateUpDownPct(dataframe, 'up_count')
# # ======================================================================================Decrease
# ################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
informative['bb_lowerband'] = bollinger['lower']
informative['bb_middleband'] = bollinger['mid']
informative['bb_upperband'] = bollinger['upper']
# # Moving Averages
# informative['ema5'] = EMAIndicator(informative['close'], window=5).ema_indicator()
# informative['ema20'] = EMAIndicator(informative['close'], window=20).ema_indicator()
# informative['ma_downtrend'] = (informative['close'] < informative['ema5']) & (informative['ema5'] < informative['ema20'])
#
# # RSI
# informative['rsi'] = RSIIndicator(informative['close'], window=14).rsi()
# informative['rsi_downtrend'] = informative['rsi'] < 50
informative['max5'] = talib.MAX(informative['close'], timeperiod=5)
informative['max12'] = talib.MAX(informative['close'], timeperiod=12)
informative['min5'] = talib.MIN(informative['close'], timeperiod=5)
informative['min12'] = talib.MIN(informative['close'], timeperiod=12)
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma5_diff'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5']
informative = self.detect_loose_hammer(informative, 1.5)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
dataframe = self.detect_loose_hammer(dataframe)
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1h")
informative['sma5'] = talib.SMA(informative, timeperiod=5)
informative['sma5_diff'] = 100 * (informative['sma5'] - informative['sma5'].shift(1)) / informative['sma5']
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1h", ffill=True)
self.calculateBearishCandles(dataframe, 200)
self.calculateBearishCandles(dataframe, 48)
return dataframe
def calculateBearishCandles(self, dataframe, lookback_period):
# Trouver l'index du dernier maximum sur 200 bougies
rolling_max_indices = dataframe['high'].rolling(window=lookback_period, min_periods=1).apply(np.argmax,
raw=True)
# Convertir ces indices en index globaux du DataFrame
dataframe['last_max_index'] = dataframe.index - (lookback_period - 1) + rolling_max_indices
# Création d'une colonne pour stocker le nombre de bougies baissières depuis le dernier max
bearish_counts = np.zeros(len(dataframe))
for i in range(lookback_period, len(dataframe)):
last_max_idx = int(dataframe.loc[i, 'last_max_index'])
bearish_counts[i] = (dataframe.loc[last_max_idx:i, 'close'] < dataframe.loc[last_max_idx:i, 'open']).sum()
dataframe['bearish_candles_since_max' + str(lookback_period)] = bearish_counts
def calculateUpDownPct(self, dataframe, key):
down_pct_values = np.full(len(dataframe), np.nan)
# Remplir la colonne avec les bons calculs
for i in range(len(dataframe)):
shift_value = abs(int(dataframe[key].iloc[i])) # Récupérer le shift actuel
if i - shift_value > 1: # Vérifier que le shift ne dépasse pas l'index
down_pct_values[i] = 100 * (dataframe['close'].iloc[i] - dataframe['close'].iloc[i - shift_value]) / \
dataframe['close'].iloc[i - shift_value]
return down_pct_values
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Buy strategy Hyperopt will build and use.
"""
d = self.decalage.value
# dataframe.loc[
# (dataframe['halow'].shift(d) <= dataframe['min12'].shift(d))
# & (dataframe['min12'].shift(d) == dataframe['min12'])
# # & (dataframe['close'] < dataframe['hasma5'])
# # & (dataframe['bb_diff'] > 0.01)
# ,
# ['enter_long', 'enter_tag']] = [1, 'buy_halow']
# dataframe.loc[
# (dataframe['hasma5_diff'].shift(2) >= dataframe['hasma5_diff'].shift(1))
# & (dataframe['hasma5_diff'].shift(1) <= dataframe['hasma5_diff'])
# # & (dataframe['bb_diff'] > 0.01)
# ,
# ['enter_long', 'enter_tag']] = [1, 'buy_hasma5_diff']
# dataframe.loc[
# (dataframe['halow'].shift(decalage) <= dataframe['min288'].shift(decalage))
# # & (dataframe['min288'].shift(decalage) == dataframe['min288'])
# # & (dataframe['open'] <= dataframe['bb_middleband'])
# # & (dataframe['bb_diff'] > 0.01)
# ,
# 'buy']=1
dataframe.loc[
(
(dataframe['down_count'].shift(1) <= -8)
| (dataframe['percent12'] <= -0.012)
)
& (dataframe['down_count'] == 0)
# & (dataframe['bearish_candles_since_max48'] > 20)
,
['enter_long', 'enter_tag']] = [1, 'down']
# dataframe.loc[
# (
# # (dataframe['hapercent'] > 0)
# (dataframe['down_count'].shift(1) < - 6)
# & (dataframe['down_count'] == 0)
# & (dataframe['down_pct'].shift(1) <= -0.5)
# ), ['enter_long', 'enter_tag']] = (1, 'buy_hapercent')
# dataframe.loc[(dataframe['loose_hammer'] == 1)
# ,
# ['enter_long', 'enter_tag']] = [1, 'hammer']
# dataframe.loc[
# (
# (dataframe['low'] <= dataframe['min200'])
# # & (dataframe['min_max200'] > 0.015)
# # & (dataframe['percent5'] < 0)
# # & (dataframe['haopen'] < buy_level)
# # & (dataframe['open'] < dataframe['average_line_288'])
# & (dataframe['min200'].shift(3) == dataframe['min200'])
# ), ['enter_long', 'enter_tag']] = (1, 'min200')
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Sell strategy Hyperopt will build and use.
"""
# dataframe.loc[
# (qtpylib.crossed_above(dataframe['haclose'], dataframe['haopen'])),
# 'sell']=1
return dataframe
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs