Files
Freqtrade/HeikinAshi.py
Jérôme Delacotte 59daa1af70 Synchronise 2
2025-03-09 17:06:39 +01:00

343 lines
14 KiB
Python

# Heracles Strategy: Strongest Son of GodStra
# ( With just 1 Genome! its a bacteria :D )
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT:Add to your pairlists inside config.json (Under StaticPairList):
# {
# "method": "AgeFilter",
# "min_days_listed": 100
# },
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
#
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces roi buy --strategy Heracles
# ######################################################################
# --- Do not remove these libs ---
from freqtrade.persistence import Trade
from typing import Optional, Tuple, Union
from datetime import timezone, timedelta, datetime
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import logging
# noinspection PyUnresolvedReferences
from freqtrade.strategy import (IStrategy, informative)
from pandas import DataFrame
# --------------------------------
# Add your lib to import here
# import talib.abstract as ta
import pandas as pd
import ta
import talib.abstract as talib
from ta.utils import dropna
import freqtrade.vendor.qtpylib.indicators as qtpylib
from functools import reduce
import numpy as np
class HeikinAshi(IStrategy):
plot_config = {
"main_plot": {
"min12": {
"color": "#197260"
},
'max12': {
'color': 'green'
},
"haclose": {
"color": "red"
},
'haopen': {
'color': 'blue'
},
"min288": {
"color": "#197260"
},
'max288': {
'color': 'green'
},
'mid288': {
'color': 'blue'
}
},
"subplots": {
"Percent": {
"hapercent": {
"color": "#74effc"
}
}
}
}
# Buy hyperspace params:
buy_params = {
"buy_crossed_indicator_shift": 9,
"buy_div_max": 0.75,
"buy_div_min": 0.16,
"buy_indicator_shift": 15,
}
# Sell hyperspace params:
sell_params = {
}
# ROI table:
minimal_roi = {
"0": 0.598
}
# Stoploss:
stoploss = -1
# Optimal timeframe use it in your config
timeframe = '5m'
columns_logged = False
max_entry_position_adjustment = 20
startup_candle_count = 288
# Trailing stoploss
# trailing_stop = False
# trailing_stop_positive = 0.001
# trailing_stop_positive_offset = 0.015
# trailing_only_offset_is_reached = True
position_adjustment_enable = False
pairs = {
pair: {
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
"last_buy": 0.0
}
for pair in ["BTC/USDT", "ETH/USDT", "DOGE/USDT", "DASH/USDT", "XRP/USDT", "SOL/USDT"]
}
decalage = IntParameter(0, 48, default=12, space='buy')
########################################## END RESULT PASTE PLACE #####################################
# ------------------------------------------------------------------------------------------------------------------
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float,
min_stake: Optional[float], max_stake: float,
current_entry_rate: float, current_exit_rate: float,
current_entry_profit: float, current_exit_profit: float,
**kwargs
) -> Union[Optional[float], Tuple[Optional[float], Optional[str]]]:
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
return None
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_24 = dataframe.iloc[-25].squeeze()
# prépare les données
count_of_buys = trade.nr_of_successful_entries
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
limit_buy = 4
if (count_of_buys < limit_buy) \
and (last_candle['min288'] == last_candle_24['min288']) \
and (current_profit < -0.01 * count_of_buys) \
and (last_candle['close'] < last_candle['mid288']):
additional_stake = self.config['stake_amount']
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type='Decrease',
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries,
stake=round(additional_stake, 2)
)
return additional_stake
if (count_of_buys >= limit_buy) & (current_profit < - 0.03 * count_of_buys):
additional_stake = self.config['stake_amount'] * 2
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type='Decrease',
profit=round(current_profit, 4), # round(current_profit * trade.stake_amount, 2),
buys=trade.nr_of_successful_entries,
stake=round(additional_stake, 2)
)
return additional_stake
return None
def calculate_stake(self, pair, last_candle, factor=1):
amount = self.config['stake_amount'] * factor #1000 / self.first_stack_factor.value self.protection_stake_amount.value #
return amount
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
dispo = round(self.wallets.get_available_stake_amount())
stake_amount = self.calculate_stake(pair, last_candle, 1)
self.log_trade(
last_candle=last_candle,
date=current_time,
action="START BUY",
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
stake=round(stake_amount, 2)
)
return True
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs) -> bool:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
dispo = round(self.wallets.get_available_stake_amount())
allow_to_sell = (last_candle['percent5'] < -0.00)
ok = (allow_to_sell) | (exit_reason == 'force_exit')
if ok:
# self.pairs[pair]['last_max'] = 0
# self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_sell'] = rate
self.log_trade(
last_candle=last_candle,
date=current_time,
action="Sell",
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(trade.calc_profit(rate, amount), 2)
)
#print(f"Sell {current_time} {exit_reason} rate={rate:.3f} amount={amount} profit={amount * rate:.3f}")
return ok
def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
if (current_profit > 0.004) \
& (last_candle['hapercent'] < 0.0) \
& (last_candle['percent'] < 0.0):
count_of_buys = trade.nr_of_successful_entries
return 'profit_' + str(count_of_buys)
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None, last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt':
return
if self.columns_logged % 30 == 0:
print(
f"| {'Date':<16} | {'Action':<10} | {'Pair':<10} | {'Trade Type':<18} | {'Rate':>12} | {'Dispo':>6} | {'Profit':>8} | {'Pct':>5} | {'max7_1d':>11} | {'max_touch':>12} | {'last_max':>12} | {'Buys':>5} | {'Stake':>10} |"
)
print(
f"|{'-' * 18}|{'-' * 12}|{'-' * 12}|{'-' * 20}|{'-' * 14}|{'-' * 8}|{'-' * 10}|{'-' * 7}|{'-' * 13}|{'-' * 14}|{'-' * 14}|{'-' * 7}|{'-' * 12}|"
)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
# int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
# if last_candle['sma5_pct_1d'] is not None:
# sma5_1d = round(last_candle['sma5_pct_1d'] * 100, 2)
# if last_candle['sma5_pct_1h'] is not None:
# sma5_1h = round(last_candle['sma5_pct_1h'] * 100, 2)
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
first_rate = self.pairs[pair]['last_max']
# if action != 'Sell':
# profit = round((last_candle['close'] - self.pairs[pair]['last_max']) / self.pairs[pair]['last_max'], 2)
limit_sell = rsi_pct # round((last_candle['close'] - self.pairs[pair]['last_max']) / self.pairs[pair]['last_max'], 4)
max7_1d = round(self.pairs[pair]['max_touch'], 1) #last_candle['max7_1d'] #round(100 * (last_candle['close'] - self.pairs[pair]['last_max']) / self.pairs[pair]['last_max'], 1)
pct_max = round(100 * (last_candle['close'] - max7_1d) / max7_1d, 1)
print(
f"| {date:<16} | {action:<10} | {pair:<10} | {trade_type or '-':<18} | {rate or '-':>12} | {dispo or '-':>6} | {profit or '-':>8} | {pct_max or '-':>5} | {max7_1d or '-':>11} | {round(self.pairs[pair]['max_touch'], 2) or '-':>12} | {round(self.pairs[pair]['last_max'],2) or '-':>12} | {buys or '-':>5} | {stake or '-':>10} |"
)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['halow'] = heikinashi['low']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['min12'] = talib.MIN(dataframe['close'], timeperiod=12)
dataframe['max12'] = talib.MAX(dataframe['close'], timeperiod=12)
dataframe['min288'] = talib.MIN(dataframe['close'], timeperiod=288)
dataframe['max288'] = talib.MAX(dataframe['close'], timeperiod=288)
dataframe['mid288'] = dataframe['min288'] + (dataframe['max288'] - dataframe['min288']) / 2
dataframe["percent"] = (dataframe["close"] - dataframe["open"]) / dataframe["open"]
dataframe["percent5"] = dataframe['close'].pct_change(5)
# Bollinger Bands
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
dataframe['bb_lowerband'] = bollinger['lower']
dataframe['bb_middleband'] = bollinger['mid']
dataframe['bb_upperband'] = bollinger['upper']
dataframe['bb_diff'] = (dataframe['bb_upperband'] - dataframe['bb_lowerband']) / dataframe['bb_lowerband']
return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Buy strategy Hyperopt will build and use.
"""
# dataframe.loc[
# (dataframe['halow'] <= dataframe['min12'])
# # & (dataframe['open'] <= dataframe['bb_middleband'])
# # & (dataframe['bb_diff'] > 0.01)
# ,
# 'buy']=1
decalage = 3
dataframe.loc[
(dataframe['halow'].shift(decalage) <= dataframe['min288'].shift(decalage))
& (dataframe['min288'].shift(decalage) == dataframe['min288'])
# & (dataframe['open'] <= dataframe['bb_middleband'])
# & (dataframe['bb_diff'] > 0.01)
,
'buy']=1
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Sell strategy Hyperopt will build and use.
"""
# dataframe.loc[
# (qtpylib.crossed_above(dataframe['haclose'], dataframe['haopen'])),
# 'sell']=1
return dataframe