Files
Freqtrade/Frictrade.py
Jérôme Delacotte a7b09858f4 Frictrade
2025-11-26 18:26:54 +01:00

885 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Zeus Strategy: First Generation of GodStra Strategy with maximum
# AVG/MID profit in USDT
# Author: @Mablue (Masoud Azizi)
# github: https://github.com/mablue/
# IMPORTANT: INSTALL TA BEFOUR RUN(pip install ta)
# freqtrade hyperopt --hyperopt-loss SharpeHyperOptLoss --spaces buy sell roi --strategy Zeus
# --- Do not remove these libs ---
from datetime import timedelta, datetime
from freqtrade.persistence import Trade
from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
import pandas as pd
import numpy as np
import os
import json
import csv
from pandas import DataFrame
from typing import Optional, Union, Tuple
import math
import logging
from pathlib import Path
# --------------------------------
# Add your lib to import here test git
import ta
import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
from datetime import timezone, timedelta
logger = logging.getLogger(__name__)
# Couleurs ANSI de base
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
RESET = "\033[0m"
class Frictrade(IStrategy):
startup_candle_count = 60 * 24
# ROI table:
minimal_roi = {
"0": 0.564,
"567": 0.273,
"2814": 0.12,
"7675": 0
}
# Stoploss:
stoploss = -1 # 0.256
# Custom stoploss
use_custom_stoploss = False
trailing_stop = False
trailing_stop_positive = 0.15
trailing_stop_positive_offset = 0.5
trailing_only_offset_is_reached = True
# Buy hypers
timeframe = '1m'
max_open_trades = 5
max_amount = 40
parameters = {}
# DCA config
position_adjustment_enable = True
columns_logged = False
pairs = {
pair: {
"first_buy": 0,
"last_buy": 0.0,
"last_min": 999999999999999.5,
"last_max": 0,
"trade_info": {},
"max_touch": 0.0,
"last_sell": 0.0,
'count_of_buys': 0,
'current_profit': 0,
'expected_profit': 0,
'previous_profit': 0,
"last_candle": {},
"last_count_of_buys": 0,
'base_stake_amount': 0,
'stop_buy': False,
'last_date': 0,
'stop': False,
'max_profit': 0,
'first_amount': 0,
'total_amount': 0,
'has_gain': 0,
'force_sell': False,
'force_buy': False
}
for pair in ["BTC/USDC", "ETH/USDC", "DOGE/USDC", "XRP/USDC", "SOL/USDC",
"BTC/USDT", "ETH/USDT", "DOGE/USDT", "XRP/USDT", "SOL/USDT"]
}
trades = list()
max_profit_pairs = {}
def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
minutes = 0
if self.pairs[pair]['last_date'] != 0:
minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_2 = dataframe.iloc[-2].squeeze()
last_candle_3 = dataframe.iloc[-3].squeeze()
condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
allow_to_buy = True #(condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
if allow_to_buy:
self.trades = list()
self.pairs[pair]['first_buy'] = rate
self.pairs[pair]['last_buy'] = rate
self.pairs[pair]['max_touch'] = last_candle['close']
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['count_of_buys'] = 1
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
dispo = round(self.wallets.get_available_stake_amount())
self.printLineLog()
stake_amount = self.adjust_stake_amount(pair, last_candle)
self.pairs[pair]['total_amount'] = stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
pair=pair,
rate=rate,
dispo=dispo,
profit=0,
trade_type=entry_tag,
buys=1,
stake=round(stake_amount, 2)
)
return allow_to_buy
def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
time_in_force: str,
exit_reason: str, current_time, **kwargs, ) -> bool:
# allow_to_sell = (minutes > 30)
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
minutes = int(round((current_time - trade.open_date_utc).seconds / 60, 0))
profit =trade.calc_profit(rate)
force = self.pairs[pair]['force_sell']
allow_to_sell = minutes > 30 and (last_candle['hapercent'] < 0 ) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss')
if allow_to_sell:
self.trades = list()
self.pairs[pair]['last_count_of_buys'] = trade.nr_of_successful_entries # self.pairs[pair]['count_of_buys']
self.pairs[pair]['last_sell'] = rate
self.pairs[pair]['last_candle'] = last_candle
self.pairs[pair]['previous_profit'] = 0
self.trades = list()
dispo = round(self.wallets.get_available_stake_amount())
# print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate}")
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟥Sell " + str(minutes),
pair=pair,
trade_type=exit_reason,
rate=last_candle['close'],
dispo=dispo,
profit=round(profit, 2)
)
self.pairs[pair]['max_profit'] = 0
self.pairs[pair]['force_sell'] = False
self.pairs[pair]['has_gain'] = 0
self.pairs[pair]['current_profit'] = 0
self.pairs[pair]['total_amount'] = 0
self.pairs[pair]['count_of_buys'] = 0
self.pairs[pair]['max_touch'] = 0
self.pairs[pair]['last_buy'] = 0
self.pairs[pair]['last_date'] = current_time
self.pairs[pair]['current_trade'] = None
# else:
# self.printLog(f"{current_time} SELL triggered for {pair} ({exit_reason} profit={profit} minutes={minutes} percent={last_candle['hapercent']}) but condition blocked")
return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss')
# def custom_exit(self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs):
#
# dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
# last_candle = dataframe.iloc[-1].squeeze()
# last_candle_1h = dataframe.iloc[-13].squeeze()
# before_last_candle = dataframe.iloc[-2].squeeze()
# before_last_candle_2 = dataframe.iloc[-3].squeeze()
# before_last_candle_12 = dataframe.iloc[-13].squeeze()
#
# expected_profit = self.expectedProfit(pair, last_candle)
# # print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
#
# max_touch_before = self.pairs[pair]['max_touch']
# self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
# self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
# self.pairs[pair]['current_trade'] = trade
#
# count_of_buys = trade.nr_of_successful_entries
#
# profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
# self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
# max_profit = last_candle['max5'] #self.pairs[pair]['max_profit']
# baisse = 0
# if profit > 0:
# baisse = 1 - (profit / max_profit)
# mx = max_profit / 5
# self.pairs[pair]['count_of_buys'] = count_of_buys
# self.pairs[pair]['current_profit'] = profit
#
# dispo = round(self.wallets.get_available_stake_amount())
# hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
# days_since_first_buy = (current_time - trade.open_date_utc).days
# hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
# minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
#
# if minutes % 4 == 0:
# self.log_trade(
# last_candle=last_candle,
# date=current_time,
# action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
# dispo=dispo,
# pair=pair,
# rate=last_candle['close'],
# trade_type='',
# profit=round(profit, 2),
# buys=count_of_buys,
# stake=0
# )
#
# if (last_candle['close'] > last_candle['mid']) or (last_candle['sma5_deriv1'] > 0):
# return None
#
# pair_name = self.getShortName(pair)
#
# if profit > 0.003 * count_of_buys and baisse > 0.30:
# self.pairs[pair]['force_sell'] = False
# self.pairs[pair]['force_buy'] = (self.pairs[pair]['count_of_buys'] - self.pairs[pair]['has_gain'] > 3)
# return str(count_of_buys) + '_' + 'B30_' + pair_name + '_' + str(self.pairs[pair]['has_gain'])
#
# self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
def getShortName(self, pair):
return pair.replace("/USDT", '').replace("/USDC", '').replace("_USDC", '').replace("_USDT", '')
def getLastLost(self, last_candle, pair):
last_lost = round((last_candle['close'] - self.pairs[pair]['max_touch']) / self.pairs[pair]['max_touch'], 3)
return last_lost
def getPctFirstBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['first_buy']) / self.pairs[pair]['first_buy'], 3)
def getPctLastBuy(self, pair, last_candle):
return round((last_candle['close'] - self.pairs[pair]['last_buy']) / self.pairs[pair]['last_buy'], 4)
def expectedProfit(self, pair: str, last_candle: DataFrame):
lim = 0.01
pct = 0.002
if (self.getShortName(pair) == 'BTC'):
lim = 0.005
pct = 0.001
pct_to_max = lim + pct * self.pairs[pair]['count_of_buys']
expected_profit = lim * self.pairs[pair]['total_amount'] # min(3 * lim, max(lim, pct_to_max)) # 0.004 + 0.002 * self.pairs[pair]['count_of_buys'] #min(0.01, first_max)
self.pairs[pair]['expected_profit'] = expected_profit
return expected_profit
def log_trade(self, action, pair, date, trade_type=None, rate=None, dispo=None, profit=None, buys=None, stake=None,
last_candle=None):
# Afficher les colonnes une seule fois
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return
if self.columns_logged % 10 == 0:
self.printLog(
f"| {'Date':<16} | {'Action':<10} |{'Pair':<5}| {'Trade Type':<18} |{'Rate':>8} | {'Dispo':>6} | {'Profit':>8} "
f"| {'Pct':>6} | {'max_touch':>11} | {'last_lost':>12} | {'last_max':>7}| {'last_max':>7}|{'Buys':>5}| {'Stake':>5} |"
f"{'rsi':>6}" #|Distmax|s201d|s5_1d|s5_2d|s51h|s52h|smt1h|smt2h|tdc1d|tdc1h"
)
self.printLineLog()
df = pd.DataFrame.from_dict(self.pairs, orient='index')
colonnes_a_exclure = ['last_candle',
'trade_info', 'last_date', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
self.printLog(df_filtered)
self.columns_logged += 1
date = str(date)[:16] if date else "-"
limit = None
rsi = ''
rsi_pct = ''
sma5_1d = ''
sma5_1h = ''
sma5 = str(sma5_1d) + ' ' + str(sma5_1h)
last_lost = self.getLastLost(last_candle, pair)
if buys is None:
buys = ''
max_touch = ''
pct_max = self.getPctFirstBuy(pair, last_candle)
total_counts = str(buys) + '/' + str(sum(pair_data['count_of_buys'] for pair_data in self.pairs.values()))
dist_max = ''
last_max = int(self.pairs[pair]['last_max']) if self.pairs[pair]['last_max'] > 1 else round(
self.pairs[pair]['last_max'], 3)
last_min = int(self.pairs[pair]['last_min']) if self.pairs[pair]['last_min'] > 1 else round(
self.pairs[pair]['last_min'], 3)
color = GREEN if profit > 0 else RED
profit = str(profit) + '/' + str(round(self.pairs[pair]['max_profit'], 2))
# 🟢 Dérivée 1 > 0 et dérivée 2 > 0: tendance haussière qui saccélère.
# 🟡 Dérivée 1 > 0 et dérivée 2 < 0: tendance haussière qui ralentit → essoufflement potentiel.
# 🔴 Dérivée 1 < 0 et dérivée 2 < 0: tendance baissière qui saccélère.
# 🟠 Dérivée 1 < 0 et dérivée 2 > 0: tendance baissière qui ralentit → possible bottom.
self.printLog(
f"| {date:<16} |{action:<10} | {pair[0:3]:<3} | {trade_type or '-':<18} |{rate or '-':>9}| {dispo or '-':>6} "
f"|{color}{profit or '-':>10}{RESET}| {pct_max or '-':>6} | {round(self.pairs[pair]['max_touch'], 2) or '-':>11} | {last_lost or '-':>12} "
f"| {last_max or '-':>7} | {last_min or '-':>7} |{total_counts or '-':>5}|{stake or '-':>7}"
f"{round(last_candle['max_rsi_24'], 1) or '-' :>6}|{round(last_candle['rsi_1h'], 1) or '-' :>6}|{round(last_candle['rsi_1d'], 1) or '-' :>6}|"
f"{round(last_candle['rtp_1h'] * 100, 0) or '-' :>6}|{round(last_candle['rtp_1d'] * 100, 0) or '-' :>6}|"
)
def printLineLog(self):
# f"sum1h|sum1d|Tdc|Tdh|Tdd| drv1 |drv|drv_1d|"
self.printLog(
f"+{'-' * 18}+{'-' * 12}+{'-' * 5}+{'-' * 20}+{'-' * 9}+{'-' * 8}+{'-' * 12}+{'-' * 8}+{'-' * 13}+{'-' * 14}+{'-' * 9}{'-' * 9}+{'-' * 5}+{'-' * 7}+"
f"+{'-' * 6}+{'-' * 7}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+{'-' * 5}+"
)
def printLog(self, str):
if self.config.get('runmode') == 'hyperopt' or self.dp.runmode.value in ('hyperopt'):
return;
if not self.dp.runmode.value in ('backtest', 'hyperopt', 'lookahead-analysis'):
logger.info(str)
else:
if not self.dp.runmode.value in ('hyperopt'):
print(str)
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# Add all ta features
pair = metadata['pair']
short_pair = self.getShortName(pair)
self.path = f"user_data/plots/{short_pair}/" + ("valide/" if not self.dp.runmode.value in ('backtest') else '')
heikinashi = qtpylib.heikinashi(dataframe)
dataframe['haopen'] = heikinashi['open']
dataframe['haclose'] = heikinashi['close']
dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
dataframe['mid'] = dataframe['open'] + (dataframe['close'] - dataframe['open']) / 2
dataframe['sma5'] = dataframe['mid'].ewm(span=5, adjust=False).mean() #dataframe["mid"].rolling(window=5).mean()
dataframe['sma5_deriv1'] = 1000 * (dataframe['sma5'] - dataframe['sma5'].shift(1)) / dataframe['sma5'].shift(1)
dataframe['sma60'] = dataframe['mid'].ewm(span=60, adjust=False).mean()
dataframe['sma60_deriv1'] = 1000 * (dataframe['sma60'] - dataframe['sma60'].shift(1)) / dataframe['sma60'].shift(1)
# dataframe[f"sma5_inv"] = (dataframe[f"sma5"].shift(2) >= dataframe[f"sma5"].shift(1)) \
# & (dataframe[f"sma5"].shift(1) <= dataframe[f"sma5"])
dataframe["sma5_sqrt"] = (
np.sqrt(np.abs(dataframe["sma5"] - dataframe["sma5"].shift(1)))
+ np.sqrt(np.abs(dataframe["sma5"].shift(2) - dataframe["sma5"].shift(1)))
)
dataframe["sma5_inv"] = (
(dataframe["sma5"].shift(2) >= dataframe["sma5"].shift(1))
& (dataframe["sma5"].shift(1) <= dataframe["sma5"])
& (dataframe["sma5_sqrt"] > 5)
)
dataframe["percent"] = dataframe['mid'].pct_change()
dataframe["percent3"] = dataframe['mid'].pct_change(3).rolling(3).mean()
dataframe["percent12"] = dataframe['mid'].pct_change(12).rolling(12).mean()
dataframe["percent24"] = dataframe['mid'].pct_change(24).rolling(24).mean()
dataframe['rsi'] = talib.RSI(dataframe['mid'], timeperiod=14)
dataframe['max_rsi_12'] = talib.MAX(dataframe['rsi'], timeperiod=12)
dataframe['max_rsi_24'] = talib.MAX(dataframe['rsi'], timeperiod=24)
dataframe['max5'] = talib.MAX(dataframe['mid'], timeperiod=5)
dataframe['min180'] = talib.MIN(dataframe['mid'], timeperiod=180)
dataframe['max180'] = talib.MAX(dataframe['mid'], timeperiod=180)
# ################### INFORMATIVE 1h
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1h')
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
# informative = self.populate1hIndicators(df=informative, metadata=metadata)
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=14)
informative = self.rsi_trend_probability(informative)
# informative = self.calculateRegression(informative, 'mid', lookback=15)
dataframe = merge_informative_pair(dataframe, informative, '1m', '1h', ffill=True)
# ################### INFORMATIVE 1d
informative = self.dp.get_pair_dataframe(pair=metadata['pair'], timeframe='1d')
informative['mid'] = informative['open'] + (informative['close'] - informative['open']) / 2
informative['rsi'] = talib.RSI(informative['mid'], timeperiod=5)
informative = self.rsi_trend_probability(informative)
# informative = self.calculateRegression(informative, 'mid', lookback=15)
dataframe = merge_informative_pair(dataframe, informative, '1m', '1d', ffill=True)
dataframe['last_price'] = dataframe['close']
dataframe['first_price'] = dataframe['close']
if self.dp:
if self.dp.runmode.value in ('live', 'dry_run'):
self.getOpenTrades()
for trade in self.trades:
if trade.pair != pair:
continue
filled_buys = trade.select_filled_orders('buy')
count = 0
amount = 0
for buy in filled_buys:
if count == 0:
dataframe['first_price'] = buy.price
self.pairs[pair]['first_buy'] = buy.price
self.pairs[pair]['first_amount'] = buy.price * buy.filled
# dataframe['close01'] = buy.price * 1.01
# Order(id=2396, trade=1019, order_id=29870026652, side=buy, filled=0.00078, price=63921.01,
# status=closed, date=2024-08-26 02:20:11)
dataframe['last_price'] = buy.price
self.pairs[pair]['last_buy'] = buy.price
count = count + 1
amount += buy.price * buy.filled
count_buys = count
self.pairs[pair]['total_amount'] = amount
dataframe['absolute_min'] = dataframe['mid'].rolling(1440, min_periods=1).min()
dataframe['absolute_max'] = dataframe['mid'].rolling(1440, min_periods=1).max()
# steps = (dataframe['absolute_max'] - dataframe['absolute_min']) / (dataframe['absolute_min'] * 0.01)
# levels = [dataframe['absolute_min'] * (1 + i / 100) for i in range(1, steps + 1)]
#
# print(levels)
# print(f"min={dataframe['absolute_min'].min()} max={dataframe['absolute_max'].max()}")
for i in [0, 1, 2, 3]:
dataframe[f"lvl_{i}_pct"] = dataframe['absolute_min'] * (1 + 0.01 * i)
#
# absolute_min = dataframe['absolute_min'].min()
# absolute_max = dataframe['absolute_max'].max()
#
# # Écart total
# diff = absolute_max - absolute_min
#
# # Nombre de lignes intermédiaires (1% steps)
# steps = int((absolute_max - absolute_min) / (absolute_min * 0.01))
#
# # Niveaux de prix à 1%, 2%, ..., steps%
# levels = [absolute_min * (1 + i / 100) for i in range(1, steps + 1)]
# levels = [lvl for lvl in levels if lvl < absolute_max] # évite le dernier niveau exact
#
# # ajout dans le DataFrame
# for i, lvl in enumerate(levels, start=1):
# dataframe[f"lvl_{i}_pct"] = lvl
# # Indices correspondants
# indices = [(dataframe['mid'] - lvl).abs().idxmin() for lvl in levels]
return dataframe
def getOpenTrades(self):
# if len(self.trades) == 0:
self.trades = Trade.get_open_trades()
return self.trades
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe.loc[
(
(dataframe['sma5_inv'] == 1)
# & (dataframe['max_rsi_24'] < 80)
# & (
# (dataframe['percent3'] <= -0.003)
# | (dataframe['percent12'] <= -0.003)
# | (dataframe['percent24'] <= -0.003)
# )
), ['enter_long', 'enter_tag']] = (1, f"future")
dataframe['test'] = np.where(dataframe['enter_long'] == 1, dataframe['close'] * 1.003, np.nan)
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
return dataframe
# def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# # Calculer le minimum des 14 derniers jours
# nb_pairs = len(self.dp.current_whitelist())
#
# base_stake_amount = self.config.get('stake_amount')
#
# if True : #self.pairs[pair]['count_of_buys'] == 0:
# factor = 1 #65 / min(65, last_candle['rsi_1d'])
# # if last_candle['min_max_60'] > 0.04:
# # factor = 2
#
# adjusted_stake_amount = base_stake_amount #max(base_stake_amount / 5, base_stake_amount * factor)
# else:
# adjusted_stake_amount = self.pairs[pair]['first_amount']
#
# if self.pairs[pair]['count_of_buys'] == 0:
# self.pairs[pair]['first_amount'] = adjusted_stake_amount
#
# return adjusted_stake_amount
def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
# Calcule max/min 180
low180 = last_candle["min180"]
high180 = last_candle["max180"]
mult = 1 - ((last_candle["mid"] - low180) / (high180 - low180))
print(f"low={low180} mid={last_candle['mid']} high={high180} mult={mult}")
# base_size = montant de base que tu veux utiliser (ex: stake_amount ou autre)
base_size = 2 * self.config.get('stake_amount') # exemple fraction du portefeuille; adapte selon ton code
# new stake proportionnel à mult
new_stake = base_size * mult
return new_stake
def adjust_trade_position(self, trade: Trade, current_time: datetime,
current_rate: float, current_profit: float, min_stake: float,
max_stake: float, **kwargs):
# ne rien faire si ordre deja en cours
if trade.has_open_orders:
# self.printLog("skip open orders")
return None
if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
return 0
dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
# prépare les données
current_time = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
count_of_buys = trade.nr_of_successful_entries
current_time_utc = current_time.astimezone(timezone.utc)
open_date = trade.open_date.astimezone(timezone.utc)
days_since_open = (current_time_utc - open_date).days
pair = trade.pair
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
last_lost = self.getLastLost(last_candle, pair)
pct_first = 0
total_counts = sum(
pair_data['count_of_buys'] for pair_data in self.pairs.values() if not self.getShortName(pair) == 'BTC')
if self.pairs[pair]['first_buy']:
pct_first = self.getPctFirstBuy(pair, last_candle)
lim = 0.3
if (len(dataframe) < 1):
# self.printLog("skip dataframe")
return None
# Dernier prix d'achat réel (pas le prix moyen)
last_fill_price = self.pairs[trade.pair]['last_buy'] #trade.open_rate # remplacé juste après ↓
# if len(trade.orders) > 0:
# # On cherche le dernier BUY exécuté
# buy_orders = [o for o in trade.orders if o.is_buy and o.status == "closed"]
# if buy_orders:
# last_fill_price = buy_orders[-1].price
# baisse relative
dca_threshold = 0.0025 * count_of_buys
decline = (last_fill_price - current_rate) / last_fill_price
increase = - decline
# if decline >= self.dca_threshold:
# # Exemple : on achète 50% du montant du dernier trade
# last_amount = buy_orders[-1].amount if buy_orders else 0
# stake_amount = last_amount * current_rate * 0.5
# return stake_amount
condition = last_candle['hapercent'] > 0 and last_candle['sma60_deriv1'] > 0
limit_buy = 40
if decline >= dca_threshold and condition:
try:
if self.pairs[pair]['has_gain'] and profit > 0:
self.pairs[pair]['force_sell'] = True
self.pairs[pair]['previous_profit'] = profit
return None
max_amount = self.config.get('stake_amount') * 2.5
stake_amount = min(min(max_amount, self.wallets.get_available_stake_amount()),
self.adjust_stake_amount(pair, last_candle))
# print(f"profit={profit} previous={self.pairs[pair]['previous_profit']} count_of_buys={trade.nr_of_successful_entries}")
if stake_amount > 0:
self.pairs[pair]['previous_profit'] = profit
trade_type = "Loss " + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟧 Loss -",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type=trade_type,
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
# df = pd.DataFrame.from_dict(self.pairs, orient='index')
# colonnes_a_exclure = ['last_candle', 'stop',
# 'trade_info', 'last_date', 'expected_profit', 'last_count_of_buys', 'base_stake_amount', 'stop_buy']
# df_filtered = df[df['count_of_buys'] > 0].drop(columns=colonnes_a_exclure)
# # df_filtered = df_filtered["first_buy", "last_max", "max_touch", "last_sell","last_buy", 'count_of_buys', 'current_profit']
#
# self.printLog(df_filtered)
return stake_amount
return None
except Exception as exception:
self.printLog(exception)
return None
if current_profit > 0 and (increase >= dca_threshold and self.wallets.get_available_stake_amount() > 0) \
and (last_candle['max_rsi_24'] < 60):
try:
self.pairs[pair]['previous_profit'] = profit
stake_amount = min(self.wallets.get_available_stake_amount(), self.adjust_stake_amount(pair, last_candle))
if stake_amount > 0:
self.pairs[pair]['has_gain'] += 1
trade_type = 'Gain +' + (last_candle['enter_tag'] if last_candle['enter_long'] == 1 else '')
self.pairs[trade.pair]['count_of_buys'] += 1
self.pairs[pair]['total_amount'] += stake_amount
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟡 Gain +",
dispo=dispo,
pair=trade.pair,
rate=current_rate,
trade_type='Gain',
profit=round(profit, 1),
buys=trade.nr_of_successful_entries + 1,
stake=round(stake_amount, 2)
)
self.pairs[trade.pair]['last_buy'] = current_rate
self.pairs[trade.pair]['max_touch'] = last_candle['close']
self.pairs[trade.pair]['last_candle'] = last_candle
return stake_amount
return None
except Exception as exception:
self.printLog(exception)
return None
return None
def custom_exit(self, pair, trade, current_time, current_rate, current_profit, **kwargs):
dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = dataframe.iloc[-1].squeeze()
last_candle_1h = dataframe.iloc[-13].squeeze()
before_last_candle = dataframe.iloc[-2].squeeze()
before_last_candle_2 = dataframe.iloc[-3].squeeze()
before_last_candle_12 = dataframe.iloc[-13].squeeze()
expected_profit = self.expectedProfit(pair, last_candle)
# print(f"current_time={current_time} current_profit={current_profit} expected_profit={expected_profit}")
# ----- 1) Charger les variables de trailing pour ce trade -----
max_price = self.pairs[pair]['max_touch']
self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
self.pairs[pair]['current_trade'] = trade
count_of_buys = trade.nr_of_successful_entries
profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
if current_profit > 0:
self.pairs[pair]['max_profit'] = max(self.pairs[pair]['max_profit'], profit)
# else:
# self.pairs[pair]['max_profit'] = 0
max_profit = self.pairs[pair]['max_profit']
# if current_profit > 0:
# print(f"profit={profit} max_profit={max_profit} current_profit={current_profit}")
baisse = 0
if profit > 0:
baisse = 1 - (profit / max_profit)
mx = max_profit / 5
self.pairs[pair]['count_of_buys'] = count_of_buys
self.pairs[pair]['current_profit'] = profit
dispo = round(self.wallets.get_available_stake_amount())
hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
days_since_first_buy = (current_time - trade.open_date_utc).days
hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
minutes = (current_time - trade.date_last_filled_utc).total_seconds() / 60.0
# ----- 2) Mise à jour du max_price -----
self.pairs[pair]['max_touch'] = max(last_candle['close'], self.pairs[pair]['max_touch'])
# ----- 3) Calcul du profit max atteint -----
# profit_max = (max_price - trade.open_rate) / trade.open_rate
current_trailing_stop_positive = self.trailing_stop_positive
current_trailing_only_offset_is_reached = self.trailing_only_offset_is_reached
current_trailing_stop_positive_offset = self.trailing_stop_positive_offset
max_ = last_candle['max180']
min_ = last_candle['min180']
mid = last_candle['mid']
# éviter division par zéro
position = (mid - min_) / (max_ - min_)
zone = int(position * 3) # 0 à 2
if zone == 0:
current_trailing_stop_positive = True
current_trailing_stop_positive_offset = self.trailing_stop_positive_offset * 2
# if zone == 1:
# ----- 5) Calcul du trailing stop dynamique -----
# Exemple : offset=0.321 => stop à +24.8%
trailing_stop = max_profit * (1.0 - current_trailing_stop_positive)
baisse = 0
if max_profit:
baisse = (max_profit - profit) / max_profit
if minutes % 15 == 0:
self.log_trade(
last_candle=last_candle,
date=current_time,
action="🟢 CURRENT", #🔴 CURRENT" if self.pairs[pair]['stop'] or last_candle['stop_buying'] else "
dispo=dispo,
pair=pair,
rate=last_candle['close'],
trade_type=f"{round(profit, 2)} {round(max_profit, 2)} {round(trailing_stop,2)}",
profit=round(profit, 2),
buys=count_of_buys,
stake=0
)
# ----- 4) OFFSET : faut-il attendre de dépasser trailing_stop_positive_offset ? -----
if current_trailing_only_offset_is_reached:
# Max profit pas atteint ET perte < 2 * current_trailing_stop_positive
if max_profit < min(2, current_trailing_stop_positive_offset * (count_of_buys - self.pairs[pair]['has_gain']))\
and (baisse < 0.5 and max_profit > current_trailing_stop_positive_offset): #2 * current_trailing_stop_positive:
return None # ne pas activer le trailing encore
# Sinon : trailing actif dès le début
# ----- 6) Condition de vente -----
if profit > 0 and profit <= trailing_stop:
return f"stop_{count_of_buys}"
return None
def informative_pairs(self):
# get access to all pairs available in whitelist.
pairs = self.dp.current_whitelist()
informative_pairs = [(pair, '1h') for pair in pairs]
informative_pairs += [(pair, '1d') for pair in pairs]
return informative_pairs
def populate1hIndicators(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# --- WEEKLY LEVELS ---
# semaine précédente = semaine ISO différente
df["week"] = df.index.isocalendar().week
df["year"] = df.index.year
df["weekly_low"] = (
df.groupby(["year", "week"])["low"]
.transform("min")
.shift(1) # décalé -> pas regarder la semaine en cours
)
df["weekly_high"] = (
df.groupby(["year", "week"])["high"]
.transform("max")
.shift(1)
)
# Définition simple d'une zone de demande hebdo :
# bas + 25% de la bougie => modifiable
df["weekly_demand_zone_low"] = df["weekly_low"]
df["weekly_demand_zone_high"] = df["weekly_low"] * 1.025
# --- MONTHLY LEVELS ---
df["month"] = df.index.month
df["monthly_low"] = (
df.groupby(["year", "month"])["low"]
.transform("min")
.shift(1) # mois précédent uniquement
)
df["monthly_high"] = (
df.groupby(["year", "month"])["high"]
.transform("max")
.shift(1)
)
df["monthly_demand_zone_low"] = df["monthly_low"]
df["monthly_demand_zone_high"] = df["monthly_low"] * 1.03
return df
# ----- SIGNALS SIMPLES POUR EXEMPLE -----
# def populate_buy_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# df["buy"] = 0
#
# # Exemple : acheter si le prix tape la zone de demande hebdomadaire
# df.loc[
# (df["close"] <= df["weekly_demand_zone_high"]) &
# (df["close"] >= df["weekly_demand_zone_low"]),
# "buy"
# ] = 1
#
# return df
#
# def populate_sell_trend(self, df: pd.DataFrame, metadata: dict) -> pd.DataFrame:
# df["sell"] = 0
#
# # Exemple : vendre sur retour au weekly_high précédent
# df.loc[df["close"] >= df["weekly_high"], "sell"] = 1
#
# return df
def rsi_trend_probability(self, dataframe):
dataframe = dataframe.copy()
dataframe['rsi14'] = talib.RSI(dataframe['mid'], 14)
dataframe['rsi60'] = talib.RSI(dataframe['mid'], 60)
dataframe['cross_soft'] = np.tanh((dataframe['rsi14'] - dataframe['rsi60']) / 7)
dataframe['gap'] = (dataframe['rsi14'] - dataframe['rsi60']) / 100
dataframe['trend'] = (dataframe['rsi60'] - 50) / 50
dataframe['rtp'] = (
0.6 * dataframe['cross_soft'] +
0.25 * dataframe['gap'] +
0.15 * dataframe['trend']
).clip(-1, 1)
return dataframe