Files
Freqtrade/Empty.py
Jérôme Delacotte 0250e30e57 ┏━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┓
┃ Strategy ┃ Trades ┃ Avg Profit % ┃ Tot Profit USDT ┃ Tot Profit % ┃    Avg Duration ┃  Win  Draw  Loss  Win% ┃            Drawdown ┃
┡━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━┩
│    Empty │     53 │         1.76 │        1312.665 │       131.27 │ 4 days, 1:16:00 │   33     0    20  62.3 │ 116.751 USDT  7.37% │
└──────────┴────────┴──────────────┴─────────────────┴──────────────┴─────────────────┴────────────────────────┴─────────────────────┘
2026-02-26 19:44:39 +01:00

1282 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# pr#agma pylint: disable=missing-docstring, invalid-name, pointless-string-statement
# isort: skip_file
# --- Do not remove these libs ---
from datetime import datetime
from freqtrade.persistence import Trade
import numpy as np # noqa
import pandas as pd # noqa
from pandas import DataFrame
from datetime import timezone, timedelta
from datetime import timedelta, datetime
from freqtrade.strategy.interface import IStrategy
from freqtrade.persistence import Trade
from typing import Optional, Union, Tuple
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
# --------------------------------
# Add your lib to import here
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
from functools import reduce
from random import shuffle
import logging
logger = logging.getLogger(__name__)
timeperiods = [3, 5, 12, 24, 36, 48, 60]
# Couleurs ANSI de base
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
RESET = "\033[0m"
timeperiods = [3, 5, 12, 24, 36, 48, 60]
sma_indicators = list()
score_indicators = list()
stop_buying_indicators = list()
god_genes_with_timeperiod = list()
for timeperiod in timeperiods:
sma_indicators.append(f"sma{timeperiod}")
sma_indicators.append(f"sma{timeperiod}_1d")
# god_genes_with_timeperiod.append(f'max{timeperiod}')
# god_genes_with_timeperiod.append(f'min{timeperiod}')
# god_genes_with_timeperiod.append(f"percent{timeperiod}")
# god_genes_with_timeperiod.append(f"sma{timeperiod}")
god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv1")
god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv2")
god_genes_with_timeperiod.append(f"sma{timeperiod}_score")
# stoploss_indicators.append(f"stop_buying{timeperiod}")
stop_buying_indicators.append(f"stop_buying{timeperiod}_1d")
score_indicators.append(f"sma{timeperiod}_score")
# score_indicators.append(f"sma{timeperiod}_score_1d")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_up")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_down")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_up")
# god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_down")
operators = [
"D", # Disabled gene
">", # Indicator, bigger than cross indicator
"<", # Indicator, smaller than cross indicator
"=", # Indicator, equal with cross indicator
"C", # Indicator, crossed the cross indicator
"CA", # Indicator, crossed above the cross indicator
"CB", # Indicator, crossed below the cross indicator
# ">R", # Normalized indicator, bigger than real number
# "=R", # Normalized indicator, equal with real number
# "<R", # Normalized indicator, smaller than real number
# "/>R", # Normalized indicator devided to cross indicator, bigger than real number
# "/=R", # Normalized indicator devided to cross indicator, equal with real number
# "/<R", # Normalized indicator devided to cross indicator, smaller than real number
# "UT", # Indicator, is in UpTrend status
# "DT", # Indicator, is in DownTrend status
# "OT", # Indicator, is in Off trend status(RANGE)
# "CUT", # Indicator, Entered to UpTrend status
# "CDT", # Indicator, Entered to DownTrend status
# "COT" # Indicator, Entered to Off trend status(RANGE)
]
# number of candles to check up,don,off trend.
TREND_CHECK_CANDLES = 4
DECIMALS = 1
def normalize(df):
df = (df-df.min())/(df.max()-df.min())
return df
def gene_calculator(dataframe, indicator):
# print(indicator)
# Cuz Timeperiods not effect calculating CDL patterns recognations
if 'CDL' in indicator:
splited_indicator = indicator.split('-')
splited_indicator[1] = "0"
new_indicator = "-".join(splited_indicator)
# print(indicator, new_indicator)
indicator = new_indicator
gene = indicator.split("-")
gene_name = gene[0]
gene_len = len(gene)
# print(f"GENE {gene_name} {gene_len} {indicator}")
if gene_name in dataframe.keys():
# print(f"{indicator}, calculated befoure")
# print(len(dataframe.keys()))
return dataframe[gene_name]
if indicator in dataframe.keys():
# print(f"{indicator}, calculated befoure")
# print(len(dataframe.keys()))
return dataframe[indicator]
else:
result = None
# For Pattern Recognations
if gene_len == 1:
# print('gene_len == 1\t', indicator)
result = getattr(talib, gene_name)(
dataframe
)
return normalize(result)
elif gene_len == 2:
# print('gene_len == 2\t', indicator)
gene_timeperiod = int(gene[1])
result = getattr(talib, gene_name)(
dataframe,
timeperiod=gene_timeperiod,
)
return normalize(result)
# For
elif gene_len == 3:
# print('gene_len == 3\t', indicator)
gene_timeperiod = int(gene[2])
gene_index = int(gene[1])
result = getattr(talib, gene_name)(
dataframe,
timeperiod=gene_timeperiod,
).iloc[:, gene_index]
return normalize(result)
# For trend operators(MA-5-SMA-4)
elif gene_len == 4:
# print('gene_len == 4\t', indicator)
gene_timeperiod = int(gene[1])
sharp_indicator = f'{gene_name}-{gene_timeperiod}'
dataframe[sharp_indicator] = getattr(talib, gene_name)(
dataframe,
timeperiod=gene_timeperiod,
)
return normalize(talib.SMA(dataframe[sharp_indicator].fillna(0), TREND_CHECK_CANDLES))
# For trend operators(STOCH-0-4-SMA-4)
elif gene_len == 5:
# print('gene_len == 5\t', indicator)
gene_timeperiod = int(gene[2])
gene_index = int(gene[1])
sharp_indicator = f'{gene_name}-{gene_index}-{gene_timeperiod}'
dataframe[sharp_indicator] = getattr(talib, gene_name)(
dataframe,
timeperiod=gene_timeperiod,
).iloc[:, gene_index]
return normalize(talib.SMA(dataframe[sharp_indicator].fillna(0), TREND_CHECK_CANDLES))
def condition_generator(dataframe, operator, indicator, crossed_indicator, real_num):
condition = (dataframe['volume'] > 10)
# TODO : it ill callculated in populate indicators.
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option("display.width", 200)
# print(f"{indicator} {crossed_indicator} {real_num}")
dataframe[indicator] = gene_calculator(dataframe, indicator)
dataframe[crossed_indicator] = gene_calculator(dataframe, crossed_indicator)
indicator_trend_sma = f"{indicator}-SMA-{TREND_CHECK_CANDLES}"
if operator in ["UT", "DT", "OT", "CUT", "CDT", "COT"]:
dataframe[indicator_trend_sma] = gene_calculator(dataframe, indicator_trend_sma)
if operator == ">":
condition = (dataframe[indicator] > dataframe[crossed_indicator])
elif operator == "=":
condition = (np.isclose(dataframe[indicator], dataframe[crossed_indicator]))
elif operator == "<":
condition = (dataframe[indicator] < dataframe[crossed_indicator])
elif operator == "C":
condition = (
(qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) |
(qtpylib.crossed_above(
dataframe[indicator], dataframe[crossed_indicator]))
)
elif operator == "CA":
condition = (qtpylib.crossed_above(dataframe[indicator], dataframe[crossed_indicator]))
elif operator == "CB":
condition = (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator]))
elif operator == ">R":
condition = (dataframe[indicator] > real_num)
elif operator == "=R":
condition = (np.isclose(dataframe[indicator], real_num))
elif operator == "<R":
condition = (dataframe[indicator] < real_num)
elif operator == "/>R":
condition = (dataframe[indicator].div(dataframe[crossed_indicator]) > real_num)
elif operator == "/=R":
condition = (np.isclose(dataframe[indicator].div(dataframe[crossed_indicator]), real_num))
elif operator == "/<R":
condition = (dataframe[indicator].div(dataframe[crossed_indicator]) < real_num)
elif operator == "UT":
condition = (dataframe[indicator] > dataframe[indicator_trend_sma])
elif operator == "DT":
condition = (dataframe[indicator] < dataframe[indicator_trend_sma])
elif operator == "OT":
condition = (np.isclose(dataframe[indicator], dataframe[indicator_trend_sma]))
elif operator == "CUT":
condition = (
(
qtpylib.crossed_above(dataframe[indicator],dataframe[indicator_trend_sma])
) & (
dataframe[indicator] > dataframe[indicator_trend_sma]
)
)
elif operator == "CDT":
condition = (
(
qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma])
) &
(
dataframe[indicator] < dataframe[indicator_trend_sma]
)
)
elif operator == "COT":
condition = (
(
(
qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma])
) |
(
qtpylib.crossed_above(dataframe[indicator], dataframe[indicator_trend_sma])
)
) &
(
np.isclose(dataframe[indicator], dataframe[indicator_trend_sma])
)
)
return condition, dataframe
# #########################################################################################################################
# This class is a sample. Feel free to customize it.
class Empty(IStrategy):
# Strategy interface version - allow new iterations of the strategy interface.
# Check the documentation or the Sample strategy to get the latest version.
INTERFACE_VERSION = 2
# ROI table:
minimal_roi = {
#"0": 0.015
"0": 0.5
}
# Stoploss:
stoploss = -1
trailing_stop = False
# trailing_stop_positive = 0.15
# trailing_stop_positive_offset = 0.20
# trailing_only_offset_is_reached = False
position_adjustment_enable = True
use_custom_stoploss = True
#max_open_trades = 3
# Optimal ticker interval for the strategy.
timeframe = '1h'
# Run "populate_indicators()" only for new candle.
process_only_new_candles = False
# These values can be overridden in the "ask_strategy" section in the config.
use_sell_signal = True
sell_profit_only = False
ignore_roi_if_buy_signal = False
# Number of candles the strategy requires before producing valid signals
startup_candle_count: int = 30
columns_logged = False
pairs = {
pair: {
'first_amount': 0,
"first_buy": 0,
"last_buy": 0.0,
"last_min": 999999999999999.5,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
'previous_profit': 0,
"last_candle": {},
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'last_profit': 0,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
'force_buy': False,
'current_trade': None,
'last_trade': None,
'force_stop': False,
'count_of_lost': 0
}
for pair in ["BTC/USDC", "BTC/USDT"]
}
plot_config = {
"main_plot": {
"sma5": {
"color": "#7aa90b"
},
"min24": {
"color": "#121acd"
}
},
"subplots": {
"Inv": {
"sma5_inv": {
"color": "#aef878",
"type": "line"
},
"zero": {
"color": "#fdba52"
},
"sma24_score": {
"color": "#f1f5b0"
}
},
"drv": {
"sma5_deriv1": {
"color": "#96eebb"
}
}
},
"options": {
"markAreaZIndex": 1
}
}
buy_deriv1_sma60 = DecimalParameter(-0.005, 0.005, decimals=3, default=0, space='buy')
buy_deriv1_sma5d = DecimalParameter(-0.07, 0.07, decimals=2, default=0, space='buy')
buy_deriv1_sma12d = DecimalParameter(-0.07, 0.07, decimals=2, default=0, space='buy')
buy_deriv2_sma60 = DecimalParameter(-0.005, 0.005, decimals=3, default=0, space='buy')
buy_deriv2_sma5d = DecimalParameter(-0.07, 0.07, decimals=2, default=0, space='buy')
buy_deriv2_sma12d = DecimalParameter(-0.07, 0.07, decimals=2, default=0, space='buy')
# Buy Hyperoptable Parameters/Spaces.
# buy_crossed_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="ADD-20", space='buy')
# buy_crossed_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="ASIN-6", space='buy')
# buy_crossed_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLEVENINGSTAR-50", space='buy')
#
# buy_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="SMA-100", space='buy')
# buy_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="WILLR-50", space='buy')
# buy_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLHANGINGMAN-20", space='buy')
#
# buy_operator0 = CategoricalParameter(operators, default="/<R", space='buy')
# buy_operator1 = CategoricalParameter(operators, default="<R", space='buy')
# buy_operator2 = CategoricalParameter(operators, default="CB", space='buy')
#
# buy_real_num0 = DecimalParameter(-1, 1, decimals=DECIMALS, default=0, space='buy')
# buy_real_num1 = DecimalParameter(-1, 1, decimals=DECIMALS, default=0, space='buy')
# buy_real_num2 = DecimalParameter(-1, 1, decimals=DECIMALS, default=0, space='buy')
# Sell Hyperoptable Parameters/Spaces.
# sell_crossed_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="CDLSHOOTINGSTAR-150", space='sell')
# sell_crossed_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="MAMA-1-100", space='sell')
# sell_crossed_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLMATHOLD-6", space='sell')
#
# sell_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="CDLUPSIDEGAP2CROWS-5", space='sell')
# sell_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="CDLHARAMICROSS-150", space='sell')
# sell_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDL2CROWS-5", space='sell')
#
# sell_operator0 = CategoricalParameter(operators, default="<R", space='sell')
# sell_operator1 = CategoricalParameter(operators, default="D", space='sell')
# sell_operator2 = CategoricalParameter(operators, default="/>R", space='sell')
#
# sell_real_num0 = DecimalParameter(-1, 1, decimals=DECIMALS, default=0, space='sell')
# sell_real_num1 = DecimalParameter(-1, 1, decimals=DECIMALS, default=0, space='sell')
# sell_real_num2 = DecimalParameter(-1, 1, decimals=DECIMALS, default=0, space='sell')
sell_score_indicator = CategoricalParameter(score_indicators, default="sma24_score", space='sell')
sell_sma_indicators = CategoricalParameter(sma_indicators, default="sma24_score", space='sell')
sell_crossed_sma_indicators = CategoricalParameter(sma_indicators, default="sma24_score", space='sell')
drop_from_last_entry = DecimalParameter(-0.1, 0, decimals=2, default=-0.025, space='protection')
# range_pos_stoploss = DecimalParameter(0, 0.1, decimals=2, default=0.05, space='protection')
# stoploss_force = DecimalParameter(-0.2, 0, decimals=2, default=-0.05, space='protection')
# stoploss_timeperiod = CategoricalParameter(timeperiods, default="12", space='protection')
def custom_stake_amount(self, pair: str, current_time: datetime, current_rate: float,
proposed_stake: float, min_stake: float, max_stake: float,
**kwargs) -> float:
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
return self.adjust_stake_amount(pair, last_candle)
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
if (self.pairs[pair]['first_amount'] > 0):
amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount'])
else:
# range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1d"]
# range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1d"]
#
# if range_max == range_min:
# return -0.1 # sécurité
#
# range_pos = (last_candle['close'] - range_min) / (range_max - range_min)
range_pos = last_candle[f"range_pos"]
sl_min = self.wallets.get_available_stake_amount() / 6
sl_max = self.wallets.get_available_stake_amount() / 2
amount = sl_min + (1 - range_pos) * (sl_max - sl_min)
amount = self.wallets.get_available_stake_amount()
return min(amount, self.wallets.get_available_stake_amount())
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# print("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
return 0
#
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
before_last_candle_12 = dataframe.iloc[-13].squeeze()
before_last_candle_24 = dataframe.iloc[-25].squeeze()
last_candle_3 = dataframe.iloc[-4].squeeze()
last_candle_previous_1h = dataframe.iloc[-13].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pair = trade.pair
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
# last_lost = self.getLastLost(last_candle, pair)
# pct_first = 0
stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))
# if (last_candle['enter_long'] == 1 and current_profit < -0.05 and stake_amount > 10) :
#
# print(f"adjust {current_time} {stake_amount}")
# print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
# return stake_amount
if last_candle['enter_long'] != 1:
return None
filled_buys = [
o for o in trade.orders
if o.status == "closed" and o.ft_order_side == "buy"
]
if not filled_buys:
return None
last_buy = max(filled_buys, key=lambda o: o.order_date)
last_entry_price = last_buy.price
if last_entry_price:
drop_from_last_entry = (current_rate - last_entry_price) / last_entry_price
if drop_from_last_entry <= self.drop_from_last_entry.value and last_candle['min60'] == last_candle_3['min60'] \
and last_candle['range_pos'] < 0.03 and last_candle['hapercent'] > 0:
# stake_amount = trade.stake_amount
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['total_amount'] += stake_amount
print(f"adjust {pair} drop={drop_from_last_entry} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
# print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
trade_type = last_candle['enter_tag'] if last_candle['enter_long'] == 1 else 'pct48'
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟧 Loss -",
dispo=round(dispo,0),
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
condition = False if self.pairs[pair]['count_of_lost'] >= 1 \
and (last_candle['sma12_deriv1_1d'] < 0.001 \
or last_candle['sma12_deriv2_1d'] < 0.001) else True
reason = ''
if not condition:
reason = 'lost'
if condition:
if last_candle['range_pos'] > 0.05:# and self.pairs[pair]['force_stop']:
condition = last_candle['sma5'] > last_candle['sma60']
if not condition:
reason = 'range'
# if self.pairs[pair]['force_stop'] and last_candle['range_pos'] < 0.02:
# self.pairs[pair]['force_stop'] = False
if False and self.pairs[pair]['last_trade'].close_date is not None:
# base cooldown = n bougies / cooldown proportionnel au profit
# bougies de plus par %
cooldown_candles = 12 + 6 * (int(self.pairs[pair]['last_profit'] / 0.01)) # réglable
# temps depuis la fermeture
candles_since_close = ((current_time - self.pairs[pair]['last_trade'].close_date).total_seconds() / 3600) # seconds / heure
condition = (candles_since_close >= cooldown_candles)
print(f"Cool close date = trade={self.pairs[pair]['last_trade'].close_date} down {round(self.pairs[pair]['last_profit'], 3)} {cooldown_candles} {candles_since_close}")
# self.should_enter_trade(pair, last_candle, current_time)
if self.pairs[pair]['stop']:
reason = 'stop'
allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
# force = self.pairs[pair]['force_buy']
# if self.pairs[pair]['force_buy']:
# self.pairs[pair]['force_buy'] = False
# allow_to_buy = True
# else:
# if not self.should_enter_trade(pair, last_candle, current_time):
# allow_to_buy = False
dispo = round(self.wallets.get_available_stake_amount())
if allow_to_buy:
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_profit'] = 0
self.pairs[pair]['last_trade'] = None
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
self.pairs[pair]['last_date'] = current_time
# self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.pairs[pair]['first_amount'] = stake_amount
self.pairs[pair]['total_amount'] = stake_amount
# print(f"Buy {pair} {current_time} {entry_tag} dispo={dispo} amount={stake_amount} rate={rate} rate={rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
else:
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + reason,
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=0
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
profit = trade.calc_profit(rate)
force = self.pairs[pair]['force_sell']
allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'god') or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss') or (exit_reason == 'trailing_stop_loss')
minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0))
if allow_to_sell:
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_candle'] = last_candle
profit = trade.calc_profit(rate)
self.pairs[pair]['previous_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} rate={rate} open_rate={trade.open_rate} profit={profit}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟥Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(profit, 2)
)
self.pairs[pair]['first_amount'] = 0
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['has_gain'] = 0
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['total_amount'] = 0
self.pairs[pair]['count_of_buys'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['last_trade'] = trade
self.pairs[pair]['current_trade'] = None
self.pairs[pair]['max_profit'] = 0
if profit < 0:
self.pairs[pair]['count_of_lost'] += 1
else:
self.pairs[pair]['count_of_lost'] = 0
else:
print(f"{current_time} STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning")
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
before_last_candle = dataframe.iloc[-2]
self.pairs[pair]['current_trade'] = trade
momentum = last_candle[self.sell_score_indicator.value]
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
count_of_buys = trade.nr_of_successful_entries
expected_profit = self.expectedProfit(pair, last_candle)
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
max_profit = self.pairs[pair]['max_profit']
baisse = 0
if profit > 0:
baisse = 1 - (profit / max_profit)
mx = max_profit / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount() + self.pairs[pair]['total_amount'])
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action=("🔴 NOW" if self.pairs[pair]['stop'] else "🟢 NOW "),
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='momentum' + str(round(momentum, 2)),
# profit=round(profit, 2),
# buys=count_of_buys,
# stake=0
# )
if current_profit < - 0.02 and last_candle[f"close"] <= last_candle['sma60']:
self.pairs[pair]['force_sell'] = True
return 'sma60'
cross = qtpylib.crossed_below(dataframe[self.sell_sma_indicators.value], dataframe[self.sell_crossed_sma_indicators.value])
# if profit > 0 and cross.iloc[-1]:
# self.pairs[pair]['force_sell'] = True
# return 'Cross'
if profit > max(5, expected_profit) and baisse > 0.30 and last_candle[f"close"] <= last_candle['sma36']:
# and last_candle['percent3'] < 0 and last_candle['percent5'] < 0:
self.pairs[pair]['force_sell'] = True
return 'B30'
# if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 : #last_candle['cross_sma60']:
# self.pairs[pair]['force_sell'] = True
# return 'Range'
# if last_candle['range_pos'] > 0.05 and current_profit < - last_candle['range_pos'] /4 \
# and last_candle['sma5_deriv1_1d'] < 0 and last_candle['sma5_deriv2_1d'] < 0 \
# and last_candle['sma60_deriv1'] < 0 and last_candle['sma60_deriv2'] < 0:
# self.pairs[pair]['force_sell'] = True
# self.pairs[pair]['force_stop'] = True
# return 'Deriv';
if profit < - (dispo * 0.2):
print(f'dispo={dispo} wallet={round(self.wallets.get_available_stake_amount())} limit={-dispo * 0.1}')
self.pairs[pair]['force_sell'] = True
self.pairs[pair]['force_stop'] = True
return 'Force';
# if self.pairs[pair]['force_sell'] and self.pairs[pair]['last_trade'].close_profit > 0.02:
# return "Force"
# if last_candle['stop_buying']:
# self.pairs[pair]['force_sell'] = True
# return 'Stop'
# Si momentum fort → on laisse courir
if momentum > 1:
return None
# Si momentum faiblit → on prend profit plus tôt
# if momentum < 0 and current_profit > 0.02 and last_candle[self.sell_score_indicator.value] < before_last_candle[self.sell_score_indicator.value]\
# and last_candle['close'] < last_candle['sma5']:
# self.pairs[pair]['last_profit'] = current_profit
# return "momentum_drop"
return None
def custom_stoploss(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1]
profit = trade.calc_profit(current_rate)
candle_at_buy = self.pairs[pair]['last_candle']
# if candle_at_buy['range_pos'] > self.range_pos_stoploss.value and candle_at_buy[self.stoploss_indicator.value] < 0:
# return self.stoploss_force.value
# # print(f'test stop loss {self.stoploss} {last_candle["stop_buying12_1d"]}')
# if last_candle[self.stoploss_indicator.value] and (trade.nr_of_successful_entries >= 4 or self.wallets.get_available_stake_amount() < 300): # and profit < - 30 :
# range_min = last_candle[f"min{self.stoploss_timeperiod.value}_1d"]
# range_max = last_candle[f"max{self.stoploss_timeperiod.value}_1d"]
#
# if range_max == range_min:
# print(f'ranges={range_min}')
# return -0.1 # sécurité
#
# range_pos = (current_rate - range_min) / (range_max - range_min)
#
# if (range_pos > 1):
# return -1
#
# sl_min = -0.02
# sl_max = -0.1 #self.stoploss
#
# dynamic_sl = (sl_min + (1 - range_pos) * (sl_max - sl_min))
#
# print(f'{current_time} Loss ranges={round(range_min,0)} {round(range_max, 0)} range_pos={round(range_pos, 3)} dynamic_sl={round(dynamic_sl, 3)} '
# f'profit={profit} current={current_profit} {self.stoploss_indicator.value} {self.stoploss_timeperiod.value} {last_candle[self.stoploss_indicator.value]}')
#
# return dynamic_sl
return -1
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1d') for pair in pairs]
# informative_pairs += [(pair, '1h') for pair in pairs]
return informative_pairs
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata['pair']
dataframe = dataframe.copy()
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
dataframe['zero'] = 0
for timeperiod in timeperiods:
dataframe[f'max{timeperiod}'] = talib.MAX(dataframe['close'], timeperiod=timeperiod)
dataframe[f'min{timeperiod}'] = talib.MIN(dataframe['close'], timeperiod=timeperiod)
dataframe[f"percent{timeperiod}"] = dataframe['close'].pct_change(timeperiod)
dataframe[f"sma{timeperiod}"] = dataframe['mid'].ewm(span=timeperiod, adjust=False).mean()
self.calculeDerivees(dataframe, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
# ######################################################################################################
################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe="1d")
# informative = self.populateDataframe(informative, timeframe='1d')
# heikinashi = qtpylib.heikinashi(informative)
# informative['haopen'] = heikinashi['open']
# informative['haclose'] = heikinashi['close']
# informative['hapercent'] = (informative['haclose'] - informative['haopen']) / informative['haclose']
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
for timeperiod in timeperiods:
informative[f'max{timeperiod}'] = talib.MAX(informative['close'], timeperiod=timeperiod)
informative[f'min{timeperiod}'] = talib.MIN(informative['close'], timeperiod=timeperiod)
# informative[f"range{timeperiod}"] = ((informative["close"] - informative[f'min{timeperiod}']) / (informative[f'max{timeperiod}'] - informative[f'min{timeperiod}']))
# informative[f"percent{timeperiod}"] = informative['close'].pct_change(timeperiod)
informative[f"sma{timeperiod}"] = informative['mid'].ewm(span=timeperiod, adjust=False).mean()
self.calculeDerivees(informative, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
informative[f"sma200"] = informative['mid'].ewm(span=200, adjust=False).mean()
informative['rsi'] = talib.RSI(informative['close'], timeperiod=14)
self.calculeDerivees(informative, f"rsi", timeframe=self.timeframe, ema_period=14)
informative['max_rsi_12'] = talib.MAX(informative['rsi'], timeperiod=12)
informative['max_rsi_24'] = talib.MAX(informative['rsi'], timeperiod=24)
dataframe = merge_informative_pair(dataframe, informative, self.timeframe, "1d", ffill=True)
# ######################################################################################################
range_min = dataframe[f"min12_1d"]
range_max = dataframe[f"max48"]
dataframe[f"range_pos"] = ((dataframe['mid'] - range_min) / (range_max)).rolling(5).mean()
# dataframe['cross_sma60'] = qtpylib.crossed_below(dataframe[self.sell_sma_indicators.value], dataframe[self.sell_crossed_sma_indicators.value])
dataframe[f'stop_buying'] = qtpylib.crossed_below(dataframe[f"sma12"], dataframe['sma48'])
dataframe['atr'] = talib.ATR(dataframe)
dataframe['rsi'] = talib.RSI(dataframe['close'], timeperiod=14)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
# récupérer le dernier trade fermé
trades = Trade.get_trades_proxy(pair=pair,is_open=False)
if trades:
last_trade = trades[-1]
self.pairs[pair]['last_profit'] = last_trade.close_profit # ex: 0.12 = +12%
self.pairs[pair]['last_trade'] = last_trade
return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# dataframe.loc[
# (
# (dataframe['sma24_score'].shift(2) <= dataframe['zero'])
# & (dataframe['sma5'].shift(1) <= dataframe['sma5'])
# & (dataframe['sma5_inv'] == -1)
# & (dataframe['min24'].shift(3) == dataframe['min24'])
# ),
# 'buy'] = 1
conditions = list()
# # print(dataframe.columns)
#
# buy_indicator = self.buy_indicator0.value
# buy_crossed_indicator = self.buy_crossed_indicator0.value
# buy_operator = self.buy_operator0.value
# buy_real_num = self.buy_real_num0.value
# condition, dataframe = condition_generator(
# dataframe,
# buy_operator,
# buy_indicator,
# buy_crossed_indicator,
# buy_real_num
# )
# conditions.append(condition)
# # backup
# buy_indicator = self.buy_indicator1.value
# buy_crossed_indicator = self.buy_crossed_indicator1.value
# buy_operator = self.buy_operator1.value
# buy_real_num = self.buy_real_num1.value
#
# condition, dataframe = condition_generator(
# dataframe,
# buy_operator,
# buy_indicator,
# buy_crossed_indicator,
# buy_real_num
# )
# conditions.append(condition)
#
# buy_indicator = self.buy_indicator2.value
# buy_crossed_indicator = self.buy_crossed_indicator2.value
# buy_operator = self.buy_operator2.value
# buy_real_num = self.buy_real_num2.value
# condition, dataframe = condition_generator(
# dataframe,
# buy_operator,
# buy_indicator,
# buy_crossed_indicator,
# buy_real_num
# )
# conditions.append(condition)
# conditions.append((dataframe[self.stop_buying_indicator.value] == False) | (dataframe['range_pos'] < 0))
#conditions.append((dataframe[self.stop_buying_indicator.value] == False) | (dataframe['range_pos'] < 0))
conditions.append(dataframe['sma12_deriv1'] > self.buy_deriv1_sma60.value)
conditions.append(dataframe['sma5_deriv1_1d'] > self.buy_deriv1_sma5d.value)
conditions.append(dataframe['sma12_deriv1_1d'] > self.buy_deriv1_sma12d.value)
conditions.append(dataframe['sma12_deriv2'] > self.buy_deriv2_sma60.value)
conditions.append(dataframe['sma5_deriv2_1d'] > self.buy_deriv2_sma5d.value)
conditions.append(dataframe['sma12_deriv2_1d'] > self.buy_deriv2_sma12d.value)
conditions.append(dataframe['hapercent'] > 0)
# conditions.append(dataframe['percent12'] < 0.01)
# conditions.append(dataframe['percent5'] < 0.01)
conditions.append(dataframe['max_rsi_24'] < 80)
conditions.append((dataframe['max_rsi_12_1d'] < 65))\
# | (
# (dataframe['sma5_deriv1'] > 0) & (dataframe['sma12_deriv1'] > 0) & (dataframe['sma24_deriv1'] > 0) & (
# dataframe['sma48_deriv1'] > 0) & (dataframe['sma60_deriv1'] > 0) & (dataframe['sma5_deriv1_1d'] > 0))
# )
conditions.append(dataframe[f"close"] > dataframe['sma60'])
conditions.append(((dataframe[f"range_pos"] < 0.05) ) | ((dataframe['sma12_deriv1'] > 0) & (dataframe['sma12_deriv2'] > 0)))
conditions.append(dataframe['close_1d'] > dataframe['sma200_1d'])
# print(f"BUY indicators tested \n"
# f"{self.buy_indicator0.value} {self.buy_crossed_indicator0.value} {self.buy_operator0.value} {self.buy_real_num0.value} \n"
# f"{self.buy_indicator1.value} {self.buy_crossed_indicator1.value} {self.buy_operator1.value} {self.buy_real_num1.value} \n"
# f"{self.buy_indicator2.value} {self.buy_crossed_indicator2.value} {self.buy_operator2.value} {self.buy_real_num2.value} \n"
# )
if conditions:
dataframe.loc[
reduce(lambda x, y: x & y, conditions),
['enter_long', 'enter_tag']
] = (1, 'god')
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
conditions = list()
# # TODO: Its not dry code!
# sell_indicator = self.sell_indicator0.value
# sell_crossed_indicator = self.sell_crossed_indicator0.value
# sell_operator = self.sell_operator0.value
# sell_real_num = self.sell_real_num0.value
# condition, dataframe = condition_generator(
# dataframe,
# sell_operator,
# sell_indicator,
# sell_crossed_indicator,
# sell_real_num
# )
# conditions.append(condition)
#
# sell_indicator = self.sell_indicator1.value
# sell_crossed_indicator = self.sell_crossed_indicator1.value
# sell_operator = self.sell_operator1.value
# sell_real_num = self.sell_real_num1.value
# condition, dataframe = condition_generator(
# dataframe,
# sell_operator,
# sell_indicator,
# sell_crossed_indicator,
# sell_real_num
# )
# conditions.append(condition)
#
# sell_indicator = self.sell_indicator2.value
# sell_crossed_indicator = self.sell_crossed_indicator2.value
# sell_operator = self.sell_operator2.value
# sell_real_num = self.sell_real_num2.value
# condition, dataframe = condition_generator(
# dataframe,
# sell_operator,
# sell_indicator,
# sell_crossed_indicator,
# sell_real_num
# )
# conditions.append(condition)
#
#
# print(f"SELL indicators tested \n"
# f"{self.sell_indicator0.value} {self.sell_crossed_indicator0.value} {self.sell_operator0.value} {self.sell_real_num0.value} \n"
# f"{self.sell_indicator1.value} {self.sell_crossed_indicator1.value} {self.sell_operator1.value} {self.sell_real_num1.value} \n"
# f"{self.sell_indicator2.value} {self.sell_crossed_indicator2.value} {self.sell_operator2.value} {self.sell_real_num2.value} \n"
# )
#
#
# conditions.append(qtpylib.crossed_below(dataframe['mid'], dataframe['sma24']))
# conditions.append((dataframe['range_pos'] > 0.04))
#
# if conditions:
# dataframe.loc[reduce(lambda x, y: x & y, conditions), ['exit_long', 'exit_tag']] = (1, 'god')
return dataframe
def calculeDerivees(
self,
dataframe: pd.DataFrame,
name: str,
suffixe: str = '',
window: int = 100,
coef: float = 0.15,
ema_period: int = 10,
verbose: bool = True,
timeframe: str = '5m'
) -> pd.DataFrame:
"""
Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
avec epsilon adaptatif basé sur rolling percentiles.
"""
d1_col = f"{name}{suffixe}_deriv1"
d2_col = f"{name}{suffixe}_deriv2"
tendency_col = f"{name}{suffixe}_state"
series = dataframe[f"{name}{suffixe}"]
d1 = series.diff()
d2 = d1.diff()
pmin = int(ema_period / 3)
cond_bas = (d1.rolling(pmin).mean() > d1.rolling(ema_period).mean())
cond_haut = (d1.rolling(pmin).mean() < d1.rolling(ema_period).mean())
dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3)
dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1))
dataframe[f"{name}{suffixe}_inv"] = np.where(cond_bas, -1, np.where(cond_haut, 1, 0))
short = d1.rolling(pmin).mean()
long = d1.rolling(ema_period).mean()
spread = short - long
zscore = (spread - spread.rolling(ema_period).mean()) / spread.rolling(ema_period).std()
dataframe[f"{name}{suffixe}_score"] = zscore
# ####################################################################
# Calcul de la pente lissée
d1 = series.diff()
d1_smooth = d1.rolling(5).mean()
# Normalisation
z = (d1_smooth - d1_smooth.rolling(ema_period).mean()) / d1_smooth.rolling(ema_period).std()
dataframe[f"{name}{suffixe}_trend_up"] = (
(d1_smooth.shift(1) < 0) &
(d1_smooth > 0) &
(z > 1.0)
)
dataframe[f"{name}{suffixe}_trend_down"] = (
(d1_smooth.shift(1) > 0) &
(d1_smooth < 0) &
(z < -1.0)
)
momentum_short = d1.rolling(int(ema_period / 2)).mean()
momentum_long = d1.rolling(ema_period * 2).mean()
dataframe[f"{name}{suffixe}_trend_change_up"] = (
(momentum_short.shift(1) < momentum_long.shift(1)) &
(momentum_short > momentum_long)
)
dataframe[f"{name}{suffixe}_trend_change_down"] = (
(momentum_short.shift(1) > momentum_long.shift(1)) &
(momentum_short < momentum_long)
)
return dataframe
@property
def protections(self):
return [
{
"method": "CooldownPeriod",
"stop_duration_candles": 6
},
# {
# "method": "MaxDrawdown",
# "lookback_period_candles": 96,
# "trade_limit": 4,
# "max_allowed_drawdown": 0.1,
# "stop_duration_candles": 24
# },
# {
# "method": "StoplossGuard",
# "lookback_period_candles": 96,
# "trade_limit": 2,
# "stop_duration_candles": 48,
# "only_per_pair": False
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 6,
# "trade_limit": 2,
# "stop_duration_candles": 60,
# "required_profit": 0.02
# },
# {
# "method": "LowProfitPairs",
# "lookback_period_candles": 24,
# "trade_limit": 4,
# "stop_duration_candles": 2,
# "required_profit": 0.01
# }
]
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return
if self.columns_logged % 10 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |"
f"{'rsi':>6}|"
)
self.printLineLog()
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
# colonnes_a_exclure = ['last_candle',
# 'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# print(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
# if buys is not None:
# limit = round(last_rate * (1 - self.fibo[buys] / 100), 4)
rsi = ''
rsi_pct = ''
# if last_candle is not None:
# if (not np.isnan(last_candle['rsi_1d'])) and (not np.isnan(last_candle['rsi_1h'])):
# rsi = str(int(last_candle['rsi_1d'])) + " " + str(int(last_candle['rsi_1h']))
# if (not np.isnan(last_candle['rsi_pct_1d'])) and (not np.isnan(last_candle['rsi_pct_1h'])):
# rsi_pct = str(int(10000 * last_candle['bb_mid_pct_1d'])) + " " + str(
# int(last_candle['rsi_pct_1d'])) + " " + str(int(last_candle['rsi_pct_1h']))
# first_rate = self.percent_threshold.value
# last_rate = self.threshold.value
# action = self.color_line(action, action)
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = self.getLastLost(last_candle, pair)
if buys is None:
buys = ''
max_touch = '' # round(last_candle['max12_1d'], 1) #round(self.pairs[pair]['max_touch'], 1)
pct_max = self.getPctFirstBuy(pair, last_candle)
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
dist_max = self.getDistMax(last_candle, pair)
# if trade_type is not None:
# if np.isnan(last_candle['rsi_1d']):
# string = ' '
# else:
# string = (str(int(last_candle['rsi_1d']))) + " " + str(int(last_candle['rsi_deriv1_1d']))
# trade_type = trade_type \
# + " " + string \
# + " " + str(int(last_candle['rsi_1h'])) \
# + " " + str(int(last_candle['rsi_deriv1_1h']))
# val144 = self.getProbaHausse144(last_candle)
# val1h = self.getProbaHausse1h(last_candle)
color = GREEN if profit > 0 else RED
# color_sma24 = GREEN if last_candle['sma24_deriv1_1d'] > 0 else RED
# color_sma24_2 = GREEN if last_candle['sma24_deriv2_1d'] > 0 else RED
# color_sma5 = GREEN if last_candle['mid_smooth_5_deriv1_1d'] > 0 else RED
# color_sma5_2 = GREEN if last_candle['mid_smooth_5_deriv2_1d'] > 0 else RED
# color_sma5_1h = GREEN if last_candle['sma60_deriv1'] > 0 else RED
# color_sma5_2h = GREEN if last_candle['sma60_deriv2'] > 0 else RED
# color_smooth_1h = GREEN if last_candle['mid_smooth_1h_deriv1'] > 0 else RED
# color_smooth2_1h = GREEN if last_candle['mid_smooth_1h_deriv2'] > 0 else RED
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
self.pairs[pair]['last_max'], 3)
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
self.pairs[pair]['last_min'], 3)
profit = str(round(profit, 2)) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
self.printLog(
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
)
def printLineLog(self):
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv_1h|drv_1d|"
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
)
def printLog(self, str):
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return;
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def getLastLost(self, last_candle, pair):
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
return last_lost
def getDistMax(self, last_candle, pair):
mx = last_candle['max12_1d']
dist_max = round(100 * (mx - last_candle['close']) / mx, 0)
return dist_max
def getPctFirstBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
def getPctLastBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
def expectedProfit(self, pair: str, last_candle: DataFrame):
lim = 0.005
pct = 0.001
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
self.pairs[pair]['expected_profit'] = expected_profit
return expected_profit