diff --git a/Empty.json b/Empty.json
new file mode 100644
index 0000000..74cdd5d
--- /dev/null
+++ b/Empty.json
@@ -0,0 +1,50 @@
+{
+ "strategy_name": "Empty",
+ "params": {
+ "roi": {
+ "0": 10
+ },
+ "max_open_trades": {
+ "max_open_trades": 20
+ },
+ "buy": {
+ "buy_crossed_indicator0": "sma24_deriv2",
+ "buy_crossed_indicator1": "sma48_deriv2",
+ "buy_crossed_indicator2": "sma60_deriv1",
+ "buy_indicator0": "sma12_deriv1",
+ "buy_indicator1": "sma3_score",
+ "buy_indicator2": "sma5_deriv2",
+ "buy_operator0": ">",
+ "buy_operator1": "D",
+ "buy_operator2": "D",
+ "buy_real_num0": -0.7,
+ "buy_real_num1": 1.0,
+ "buy_real_num2": -0.4
+ },
+ "sell": {
+ "sell_crossed_indicator0": "sma24_deriv2",
+ "sell_crossed_indicator1": "sma48_score",
+ "sell_crossed_indicator2": "sma24_score",
+ "sell_indicator0": "sma5_deriv2",
+ "sell_indicator1": "sma60_score",
+ "sell_indicator2": "sma60_score",
+ "sell_operator0": "<",
+ "sell_operator1": ">",
+ "sell_operator2": "CB",
+ "sell_real_num0": 0.7,
+ "sell_real_num1": 0.5,
+ "sell_real_num2": 0.9
+ },
+ "stoploss": {
+ "stoploss": -1
+ },
+ "trailing": {
+ "trailing_stop": false,
+ "trailing_stop_positive": 0.055,
+ "trailing_stop_positive_offset": 0.106,
+ "trailing_only_offset_is_reached": false
+ }
+ },
+ "ft_stratparam_v": 1,
+ "export_time": "2026-02-19 19:14:20.539321+00:00"
+}
\ No newline at end of file
diff --git a/Empty.py b/Empty.py
index 7153a92..5e8c263 100644
--- a/Empty.py
+++ b/Empty.py
@@ -2,18 +2,238 @@
# isort: skip_file
# --- Do not remove these libs ---
from datetime import datetime
-
+from freqtrade.persistence import Trade
import numpy as np # noqa
import pandas as pd # noqa
from pandas import DataFrame
-
+from datetime import timezone, timedelta
+from datetime import timedelta, datetime
from freqtrade.strategy.interface import IStrategy
+from freqtrade.persistence import Trade
+from typing import Optional, Union, Tuple
+from freqtrade.strategy import (BooleanParameter, CategoricalParameter, DecimalParameter, stoploss_from_open,
+ IntParameter, IStrategy, merge_informative_pair, informative, stoploss_from_absolute)
# --------------------------------
# Add your lib to import here
-import talib.abstract as ta
+import ta
+import talib.abstract as talib
import freqtrade.vendor.qtpylib.indicators as qtpylib
+from functools import reduce
+from random import shuffle
+
+timeperiods = [3, 5, 12, 24, 48, 60]
+
+god_genes_with_timeperiod = list()
+for timeperiod in timeperiods:
+ # god_genes_with_timeperiod.append(f'max{timeperiod}')
+ # god_genes_with_timeperiod.append(f'min{timeperiod}')
+ # god_genes_with_timeperiod.append(f"percent{timeperiod}")
+ # god_genes_with_timeperiod.append(f"sma{timeperiod}")
+ god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv1")
+ god_genes_with_timeperiod.append(f"sma{timeperiod}_deriv2")
+ god_genes_with_timeperiod.append(f"sma{timeperiod}_score")
+ # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_up")
+ # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_down")
+ # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_up")
+ # god_genes_with_timeperiod.append(f"sma{timeperiod}_trend_change_down")
+
+operators = [
+ "D", # Disabled gene
+ ">", # Indicator, bigger than cross indicator
+ "<", # Indicator, smaller than cross indicator
+ "=", # Indicator, equal with cross indicator
+ "C", # Indicator, crossed the cross indicator
+ "CA", # Indicator, crossed above the cross indicator
+ "CB", # Indicator, crossed below the cross indicator
+ # ">R", # Normalized indicator, bigger than real number
+ # "=R", # Normalized indicator, equal with real number
+ # "R", # Normalized indicator devided to cross indicator, bigger than real number
+ # "/=R", # Normalized indicator devided to cross indicator, equal with real number
+ # "/ 10)
+
+ # TODO : it ill callculated in populate indicators.
+ pd.set_option('display.max_rows', None)
+ pd.set_option('display.max_columns', None)
+ pd.set_option("display.width", 200)
+
+ # print(f"{indicator} {crossed_indicator} {real_num}")
+
+ dataframe[indicator] = gene_calculator(dataframe, indicator)
+ dataframe[crossed_indicator] = gene_calculator(dataframe, crossed_indicator)
+
+ indicator_trend_sma = f"{indicator}-SMA-{TREND_CHECK_CANDLES}"
+ if operator in ["UT", "DT", "OT", "CUT", "CDT", "COT"]:
+ dataframe[indicator_trend_sma] = gene_calculator(dataframe, indicator_trend_sma)
+
+ if operator == ">":
+ condition = (dataframe[indicator] > dataframe[crossed_indicator])
+ elif operator == "=":
+ condition = (np.isclose(dataframe[indicator], dataframe[crossed_indicator]))
+ elif operator == "<":
+ condition = (dataframe[indicator] < dataframe[crossed_indicator])
+ elif operator == "C":
+ condition = (
+ (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator])) |
+ (qtpylib.crossed_above(
+ dataframe[indicator], dataframe[crossed_indicator]))
+ )
+ elif operator == "CA":
+ condition = (qtpylib.crossed_above(dataframe[indicator], dataframe[crossed_indicator]))
+ elif operator == "CB":
+ condition = (qtpylib.crossed_below(dataframe[indicator], dataframe[crossed_indicator]))
+ elif operator == ">R":
+ condition = (dataframe[indicator] > real_num)
+ elif operator == "=R":
+ condition = (np.isclose(dataframe[indicator], real_num))
+ elif operator == "R":
+ condition = (dataframe[indicator].div(dataframe[crossed_indicator]) > real_num)
+ elif operator == "/=R":
+ condition = (np.isclose(dataframe[indicator].div(dataframe[crossed_indicator]), real_num))
+ elif operator == "/ dataframe[indicator_trend_sma])
+ elif operator == "DT":
+ condition = (dataframe[indicator] < dataframe[indicator_trend_sma])
+ elif operator == "OT":
+ condition = (np.isclose(dataframe[indicator], dataframe[indicator_trend_sma]))
+ elif operator == "CUT":
+ condition = (
+ (
+ qtpylib.crossed_above(dataframe[indicator],dataframe[indicator_trend_sma])
+ ) & (
+ dataframe[indicator] > dataframe[indicator_trend_sma]
+ )
+ )
+ elif operator == "CDT":
+ condition = (
+ (
+ qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma])
+ ) &
+ (
+ dataframe[indicator] < dataframe[indicator_trend_sma]
+ )
+ )
+ elif operator == "COT":
+ condition = (
+ (
+ (
+ qtpylib.crossed_below(dataframe[indicator], dataframe[indicator_trend_sma])
+ ) |
+ (
+ qtpylib.crossed_above(dataframe[indicator], dataframe[indicator_trend_sma])
+ )
+ ) &
+ (
+ np.isclose(dataframe[indicator], dataframe[indicator_trend_sma])
+ )
+ )
+
+ return condition, dataframe
+# #########################################################################################################################
+
+
+
+
# This class is a sample. Feel free to customize it.
class Empty(IStrategy):
@@ -30,14 +250,18 @@ class Empty(IStrategy):
# Stoploss:
stoploss = -1
trailing_stop = True
- trailing_stop_positive = 0.001
- trailing_stop_positive_offset = 0.0175 #0.015
+ trailing_stop_positive = 0.15
+ trailing_stop_positive_offset = 0.20
trailing_only_offset_is_reached = True
+ position_adjustment_enable = True
+ use_custom_stoploss = True
+
+
#max_open_trades = 3
# Optimal ticker interval for the strategy.
- timeframe = '5m'
+ timeframe = '1h'
# Run "populate_indicators()" only for new candle.
process_only_new_candles = False
@@ -50,74 +274,546 @@ class Empty(IStrategy):
# Number of candles the strategy requires before producing valid signals
startup_candle_count: int = 30
- plot_config = {
- # Main plot indicators (Moving averages, ...)
- 'main_plot': {
- 'bb_lowerband': {'color': 'white'},
- 'bb_upperband': {'color': 'white'},
- },
- 'subplots': {
- # Subplots - each dict defines one additional plot
- "BB": {
- 'bb_width': {'color': 'white'},
- },
- "Aaron": {
- 'aroonup': {'color': 'blue'},
- 'aroondown': {'color': 'red'}
- }
+ pairs = {
+ pair: {
+ 'first_amount': 0,
+ "first_buy": 0,
+ "last_buy": 0.0,
+ "last_min": 999999999999999.5,
+ "last_max": 0,
+ "trade_info": {},
+ "max_touch": 0.0,
+ "last_sell": 0.0,
+ 'count_of_buys': 0,
+ 'current_profit': 0,
+ 'expected_profit': 0,
+ 'previous_profit': 0,
+ "last_candle": {},
+ "last_count_of_buys": 0,
+ 'base_stake_amount': 0,
+ 'stop_buy': False,
+ 'last_date': 0,
+ 'stop': False,
+ 'max_profit': 0,
+ 'total_amount': 0,
+ 'has_gain': 0,
+ 'force_sell': False,
+ 'force_buy': False,
+ 'current_trade': None,
+ 'last_trade': None
}
+ for pair in ["BTC/USDC", "BTC/USDT"]
}
+ plot_config = {
+ "main_plot": {
+ "sma5": {
+ "color": "#7aa90b"
+ },
+ "min24": {
+ "color": "#121acd"
+ }
+ },
+ "subplots": {
+ "Inv": {
+ "sma5_inv": {
+ "color": "#aef878",
+ "type": "line"
+ },
+ "zero": {
+ "color": "#fdba52"
+ },
+ "sma24_score": {
+ "color": "#f1f5b0"
+ }
+ },
+ "drv": {
+ "sma5_deriv1": {
+ "color": "#96eebb"
+ }
+ }
+ },
+ "options": {
+ "markAreaZIndex": 1
+ }
+ }
+
+ # Buy Hyperoptable Parameters/Spaces.
+ buy_crossed_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="ADD-20", space='buy')
+ buy_crossed_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="ASIN-6", space='buy')
+ buy_crossed_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLEVENINGSTAR-50", space='buy')
+
+ buy_indicator0 = CategoricalParameter(god_genes_with_timeperiod, default="SMA-100", space='buy')
+ buy_indicator1 = CategoricalParameter(god_genes_with_timeperiod, default="WILLR-50", space='buy')
+ buy_indicator2 = CategoricalParameter(god_genes_with_timeperiod, default="CDLHANGINGMAN-20", space='buy')
+
+ buy_operator0 = CategoricalParameter(operators, default="/ float:
+ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+ return self.adjust_stake_amount(pair, last_candle)
+
+ def adjust_stake_amount(self, pair: str, last_candle: DataFrame):
+ if (self.pairs[pair]['first_amount'] > 0):
+ amount = min(self.wallets.get_available_stake_amount(), self.pairs[pair]['first_amount'])
+ else:
+ amount = self.wallets.get_available_stake_amount() / 4
+ return min(amount, self.wallets.get_available_stake_amount())
+
+ def adjust_trade_position(self, trade: Trade, current_time: datetime,
+ current_rate: float, current_profit: float, min_stake: float,
+ max_stake: float, **kwargs):
+ # ne rien faire si ordre deja en cours
+ if trade.has_open_orders:
+ # print("skip open orders")
+ return None
+ if (self.wallets.get_available_stake_amount() < 10): # or trade.stake_amount >= max_stake:
+ return 0
+ #
+ dataframe, _ = self.dp.get_analyzed_dataframe(trade.pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+ before_last_candle = dataframe.iloc[-2].squeeze()
+ before_last_candle_12 = dataframe.iloc[-13].squeeze()
+ before_last_candle_24 = dataframe.iloc[-25].squeeze()
+ last_candle_3 = dataframe.iloc[-4].squeeze()
+ last_candle_previous_1h = dataframe.iloc[-13].squeeze()
+ # prépare les données
+ current_time = current_time.astimezone(timezone.utc)
+ open_date = trade.open_date.astimezone(timezone.utc)
+ dispo = round(self.wallets.get_available_stake_amount())
+ hours_since_first_buy = (current_time - trade.open_date_utc).seconds / 3600.0
+ days_since_first_buy = (current_time - trade.open_date_utc).days
+ hours = (current_time - trade.date_last_filled_utc).total_seconds() / 3600.0
+ count_of_buys = trade.nr_of_successful_entries
+ current_time_utc = current_time.astimezone(timezone.utc)
+ open_date = trade.open_date.astimezone(timezone.utc)
+ days_since_open = (current_time_utc - open_date).days
+ pair = trade.pair
+ profit = trade.calc_profit(current_rate) #round(current_profit * trade.stake_amount, 1)
+ # last_lost = self.getLastLost(last_candle, pair)
+ # pct_first = 0
+ stake_amount = self.adjust_stake_amount(pair, last_candle)
+ # if (last_candle['enter_long'] == 1 and current_profit < -0.05 and stake_amount > 10) :
+ #
+ # print(f"adjust {current_time} {stake_amount}")
+ # print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
+ # return stake_amount
+
+ if last_candle['enter_long'] != 1:
+ return None
+
+ filled_buys = [
+ o for o in trade.orders
+ if o.status == "closed" and o.ft_order_side == "buy"
+ ]
+
+ if not filled_buys:
+ return None
+
+ last_buy = max(filled_buys, key=lambda o: o.order_date)
+ last_entry_price = last_buy.price
+
+ drop_from_last_entry = (current_rate - last_entry_price) / last_entry_price
+
+ if drop_from_last_entry <= -0.05:
+ # stake_amount = trade.stake_amount
+ print(f"adjust {current_time} {stake_amount}")
+ print(f"adjust {pair} {current_time} dispo={dispo} amount={stake_amount} rate={current_rate}")
+ return stake_amount
+
+ return None
+
+ def confirm_trade_entry(self, pair: str, order_type: str, amount: float, rate: float, time_in_force: str,
+ current_time: datetime, entry_tag: Optional[str], **kwargs) -> bool:
+
+ minutes = 0
+ if self.pairs[pair]['last_date'] != 0:
+ minutes = round(int((current_time - self.pairs[pair]['last_date']).total_seconds() / 60))
+
+ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+ last_candle_2 = dataframe.iloc[-2].squeeze()
+ last_candle_3 = dataframe.iloc[-3].squeeze()
+
+ condition = True #(last_candle[f"{indic_5m}_deriv1"] >= indic_deriv1_5m) and (last_candle[f"{indic_5m}_deriv2"] >= indic_deriv2_5m)
+
+ # self.should_enter_trade(pair, last_candle, current_time)
+ allow_to_buy = (condition and not self.pairs[pair]['stop']) | (entry_tag == 'force_entry')
+
+ # force = self.pairs[pair]['force_buy']
+ # if self.pairs[pair]['force_buy']:
+ # self.pairs[pair]['force_buy'] = False
+ # allow_to_buy = True
+ # else:
+ # if not self.should_enter_trade(pair, last_candle, current_time):
+ # allow_to_buy = False
+
+ if allow_to_buy:
+ self.pairs[pair]['first_buy'] = rate
+ self.pairs[pair]['last_buy'] = rate
+ self.pairs[pair]['max_touch'] = last_candle['close']
+ self.pairs[pair]['last_candle'] = last_candle
+ self.pairs[pair]['count_of_buys'] = 1
+ self.pairs[pair]['current_profit'] = 0
+ self.pairs[pair]['last_max'] = max(last_candle['close'], self.pairs[pair]['last_max'])
+ self.pairs[pair]['last_min'] = min(last_candle['close'], self.pairs[pair]['last_min'])
+ self.pairs[pair]['last_date'] = current_time
+
+ dispo = round(self.wallets.get_available_stake_amount())
+ # self.printLineLog()
+
+ stake_amount = self.adjust_stake_amount(pair, last_candle)
+ self.pairs[pair]['first_amount'] = stake_amount
+ self.pairs[pair]['total_amount'] = stake_amount
+ print(f"Buy {pair} {current_time} {entry_tag} dispo={dispo} amount={amount} rate={rate} rate={rate}")
+
+ # self.log_trade(
+ # last_candle=last_candle,
+ # date=current_time,
+ # action=("🟩Buy" if allow_to_buy else "Canceled") + " " + str(minutes),
+ # pair=pair,
+ # rate=rate,
+ # dispo=dispo,
+ # profit=0,
+ # trade_type=entry_tag,
+ # buys=1,
+ # stake=round(stake_amount, 2)
+ # )
+
+ return allow_to_buy
+
+ def confirm_trade_exit(self, pair: str, trade: Trade, order_type: str, amount: float, rate: float,
+ time_in_force: str,
+ exit_reason: str, current_time, **kwargs, ) -> bool:
+
+ # allow_to_sell = (minutes > 30)
+ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ last_candle = dataframe.iloc[-1].squeeze()
+
+ profit = trade.calc_profit(rate)
+ force = self.pairs[pair]['force_sell']
+ allow_to_sell = (last_candle['hapercent'] < 0 and profit > 0) or force or (exit_reason == 'force_exit') or (exit_reason == 'stop_loss')
+
+ minutes = int(round((current_time - trade.date_last_filled_utc).total_seconds() / 60, 0))
+
+ if allow_to_sell:
+ self.pairs[pair]['last_sell'] = rate
+ self.pairs[pair]['last_candle'] = last_candle
+ self.pairs[pair]['max_profit'] = 0
+ profit = trade.calc_profit(rate)
+ self.pairs[pair]['previous_profit'] = profit
+ dispo = round(self.wallets.get_available_stake_amount())
+ print(f"Sell {pair} {current_time} {exit_reason} dispo={dispo} amount={amount} rate={rate} open_rate={trade.open_rate} profit={profit}")
+ # self.log_trade(
+ # last_candle=last_candle,
+ # date=current_time,
+ # action="🟥Sell " + str(minutes),
+ # pair=pair,
+ # trade_type=exit_reason,
+ # rate=last_candle['close'],
+ # dispo=dispo,
+ # profit=round(profit, 2)
+ # )
+ self.pairs[pair]['force_sell'] = False
+ self.pairs[pair]['has_gain'] = 0
+ self.pairs[pair]['current_profit'] = 0
+ self.pairs[pair]['total_amount'] = 0
+ self.pairs[pair]['count_of_buys'] = 0
+ self.pairs[pair]['max_touch'] = 0
+ self.pairs[pair]['last_buy'] = 0
+ self.pairs[pair]['last_date'] = current_time
+ self.pairs[pair]['last_trade'] = self.pairs[pair]['current_trade']
+ self.pairs[pair]['current_trade'] = None
+ # else:
+ # print(f"STOP triggered for {pair} ({exit_reason}) but condition blocked", "warning")
+ return (allow_to_sell) | (exit_reason == 'force_exit') | (exit_reason == 'stop_loss') | force
+
+ def custom_exit(self, pair, trade, current_time,
+ current_rate, current_profit, **kwargs):
+
+ dataframe, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
+ last_row = dataframe.iloc[-1]
+ self.pairs[pair]['current_trade'] = trade
+ momentum = last_row["sma24_score"]
+
+ # Si momentum fort → on laisse courir
+ if momentum > 1:
+ return None
+
+ # Si momentum faiblit → on prend profit plus tôt
+ if momentum < 0 and current_profit > 0.02:
+ return "momentum_drop"
+
+ return None
+
+ def custom_stoploss(self, pair, trade, current_time,
+ current_rate, current_profit, **kwargs):
+
+ if current_profit < - 0.1 and self.wallets.get_available_stake_amount():
+ return -0.1
+
+ return -1
+
def informative_pairs(self):
return []
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
- # MACD
- #macd = ta.MACD(dataframe)
- #dataframe['macd'] = macd['macd']
- #dataframe['macdsignal'] = macd['macdsignal']
- #dataframe['macdhist'] = macd['macdhist']
+ dataframe = dataframe.copy()
+ heikinashi = qtpylib.heikinashi(dataframe)
+ dataframe['haopen'] = heikinashi['open']
+ dataframe['haclose'] = heikinashi['close']
+ dataframe['hapercent'] = (dataframe['haclose'] - dataframe['haopen']) / dataframe['haclose']
+ dataframe['mid'] = dataframe['haopen'] + (dataframe['haclose'] - dataframe['haopen']) / 2
+ dataframe['zero'] = 0;
- # RSI
- #dataframe['rsi'] = ta.RSI(dataframe)
+ for timeperiod in timeperiods:
+ dataframe[f'max{timeperiod}'] = talib.MAX(dataframe['close'], timeperiod=timeperiod)
+ dataframe[f'min{timeperiod}'] = talib.MIN(dataframe['close'], timeperiod=timeperiod)
+ dataframe[f"percent{timeperiod}"] = dataframe['close'].pct_change(timeperiod)
+ dataframe[f"sma{timeperiod}"] = dataframe['mid'].ewm(span=timeperiod, adjust=False).mean()
+ self.calculeDerivees(dataframe, f"sma{timeperiod}", timeframe=self.timeframe, ema_period=timeperiod)
- # Bollinger Bands
- bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
- dataframe['bb_lowerband'] = bollinger['lower']
- dataframe['bb_middleband'] = bollinger['mid']
- dataframe['bb_upperband'] = bollinger['upper']
- dataframe["bb_percent"] = (
- (dataframe["close"] - dataframe["bb_lowerband"]) /
- (dataframe["bb_upperband"] - dataframe["bb_lowerband"])
- )
- dataframe["bb_width"] = (
- (dataframe["bb_upperband"] - dataframe["bb_lowerband"]) / dataframe["bb_middleband"]
- )
+ # ######################################################################################################
+ dataframe['stop_buying_deb'] = (dataframe['sma60_trend_change_down'] == 1)
+ dataframe['stop_buying_end'] = (dataframe['sma60_trend_change_up'] == 1)
+ latched = np.zeros(len(dataframe), dtype=bool)
+
+ for i in range(1, len(dataframe)):
+ if dataframe['stop_buying_deb'].iloc[i]:
+ latched[i] = True
+ elif dataframe['stop_buying_end'].iloc[i]:
+ latched[i] = False
+ else:
+ latched[i] = latched[i - 1]
+ dataframe['stop_buying'] = latched
return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
- dataframe.loc[
- (
- # (
- # (dataframe['close'] < dataframe['bb_lowerband'])
- # & (dataframe['bb_width'] >= 0.065)
- # #& (dataframe['rsi'] < 45)
- # & (dataframe['volume'] > 0)
- # )
- ),
- 'buy'] = 1
+ # dataframe.loc[
+ # (
+ # (dataframe['sma24_score'].shift(2) <= dataframe['zero'])
+ # & (dataframe['sma5'].shift(1) <= dataframe['sma5'])
+ # & (dataframe['sma5_inv'] == -1)
+ # & (dataframe['min24'].shift(3) == dataframe['min24'])
+ # ),
+ # 'buy'] = 1
+
+ conditions = list()
+
+ # print(dataframe.columns)
+
+ buy_indicator = self.buy_indicator0.value
+ buy_crossed_indicator = self.buy_crossed_indicator0.value
+ buy_operator = self.buy_operator0.value
+ buy_real_num = self.buy_real_num0.value
+ condition, dataframe = condition_generator(
+ dataframe,
+ buy_operator,
+ buy_indicator,
+ buy_crossed_indicator,
+ buy_real_num
+ )
+ conditions.append(condition)
+ # backup
+ buy_indicator = self.buy_indicator1.value
+ buy_crossed_indicator = self.buy_crossed_indicator1.value
+ buy_operator = self.buy_operator1.value
+ buy_real_num = self.buy_real_num1.value
+
+ condition, dataframe = condition_generator(
+ dataframe,
+ buy_operator,
+ buy_indicator,
+ buy_crossed_indicator,
+ buy_real_num
+ )
+ conditions.append(condition)
+
+ buy_indicator = self.buy_indicator2.value
+ buy_crossed_indicator = self.buy_crossed_indicator2.value
+ buy_operator = self.buy_operator2.value
+ buy_real_num = self.buy_real_num2.value
+ condition, dataframe = condition_generator(
+ dataframe,
+ buy_operator,
+ buy_indicator,
+ buy_crossed_indicator,
+ buy_real_num
+ )
+ conditions.append(condition)
+ conditions.append((dataframe['stop_buying'] == False))
+
+ # conditions.append((dataframe['min60'].shift(5) == dataframe['min60']))
+ # conditions.append((dataframe['low'] < dataframe['min60']))
+
+ print(f"BUY indicators tested \n"
+ f"{self.buy_indicator0.value} {self.buy_crossed_indicator0.value} {self.buy_operator0.value} {self.buy_real_num0.value} \n"
+ f"{self.buy_indicator1.value} {self.buy_crossed_indicator1.value} {self.buy_operator1.value} {self.buy_real_num1.value} \n"
+ f"{self.buy_indicator2.value} {self.buy_crossed_indicator2.value} {self.buy_operator2.value} {self.buy_real_num2.value} \n"
+ )
+
+ if conditions:
+ dataframe.loc[
+ reduce(lambda x, y: x & y, conditions),
+ ['enter_long', 'enter_tag']
+ ] = (1, 'god')
+
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
+ conditions = list()
+ # TODO: Its not dry code!
+ sell_indicator = self.sell_indicator0.value
+ sell_crossed_indicator = self.sell_crossed_indicator0.value
+ sell_operator = self.sell_operator0.value
+ sell_real_num = self.sell_real_num0.value
+ condition, dataframe = condition_generator(
+ dataframe,
+ sell_operator,
+ sell_indicator,
+ sell_crossed_indicator,
+ sell_real_num
+ )
+ conditions.append(condition)
- dataframe.loc[
- (
+ sell_indicator = self.sell_indicator1.value
+ sell_crossed_indicator = self.sell_crossed_indicator1.value
+ sell_operator = self.sell_operator1.value
+ sell_real_num = self.sell_real_num1.value
+ condition, dataframe = condition_generator(
+ dataframe,
+ sell_operator,
+ sell_indicator,
+ sell_crossed_indicator,
+ sell_real_num
+ )
+ conditions.append(condition)
- ),
- 'sell'] = 1
+ sell_indicator = self.sell_indicator2.value
+ sell_crossed_indicator = self.sell_crossed_indicator2.value
+ sell_operator = self.sell_operator2.value
+ sell_real_num = self.sell_real_num2.value
+ condition, dataframe = condition_generator(
+ dataframe,
+ sell_operator,
+ sell_indicator,
+ sell_crossed_indicator,
+ sell_real_num
+ )
+ conditions.append(condition)
+
+
+ print(f"SELL indicators tested \n"
+ f"{self.sell_indicator0.value} {self.sell_crossed_indicator0.value} {self.sell_operator0.value} {self.sell_real_num0.value} \n"
+ f"{self.sell_indicator1.value} {self.sell_crossed_indicator1.value} {self.sell_operator1.value} {self.sell_real_num1.value} \n"
+ f"{self.sell_indicator2.value} {self.sell_crossed_indicator2.value} {self.sell_operator2.value} {self.sell_real_num2.value} \n"
+ )
+
+
+ if conditions:
+ dataframe.loc[reduce(lambda x, y: x & y, conditions), ['exit_long', 'exit_tag']] = (1, 'god')
return dataframe
+ def calculeDerivees(
+ self,
+ dataframe: pd.DataFrame,
+ name: str,
+ suffixe: str = '',
+ window: int = 100,
+ coef: float = 0.15,
+ ema_period: int = 10,
+ verbose: bool = True,
+ timeframe: str = '5m'
+ ) -> pd.DataFrame:
+ """
+ Calcule deriv1/deriv2 (relative simple), applique EMA, calcule tendency
+ avec epsilon adaptatif basé sur rolling percentiles.
+ """
+ d1_col = f"{name}{suffixe}_deriv1"
+ d2_col = f"{name}{suffixe}_deriv2"
+ tendency_col = f"{name}{suffixe}_state"
+
+ series = dataframe[f"{name}{suffixe}"]
+ d1 = series.diff()
+ d2 = d1.diff()
+ cond_bas = (d1.rolling(3).mean() > d1.rolling(10).mean())
+ cond_haut = (d1.rolling(3).mean() < d1.rolling(10).mean())
+
+
+ dataframe[d1_col] = (dataframe[name] - dataframe[name].shift(3)) / dataframe[name].shift(3)
+ dataframe[d2_col] = (dataframe[d1_col] - dataframe[d1_col].shift(1))
+
+ dataframe[f"{name}{suffixe}_inv"] = np.where(cond_bas, -1, np.where(cond_haut, 1, 0))
+
+ short = d1.rolling(3).mean()
+ long = d1.rolling(10).mean()
+
+ spread = short - long
+ zscore = (spread - spread.rolling(ema_period).mean()) / spread.rolling(ema_period).std()
+
+ dataframe[f"{name}{suffixe}_score"] = zscore
+
+ # ####################################################################
+ # Calcul de la pente lissée
+ d1 = series.diff()
+ d1_smooth = d1.rolling(5).mean()
+ # Normalisation
+ z = (d1_smooth - d1_smooth.rolling(ema_period).mean()) / d1_smooth.rolling(ema_period).std()
+
+ dataframe[f"{name}{suffixe}_trend_up"] = (
+ (d1_smooth.shift(1) < 0) &
+ (d1_smooth > 0) &
+ (z > 1.0)
+ )
+
+ dataframe[f"{name}{suffixe}_trend_down"] = (
+ (d1_smooth.shift(1) > 0) &
+ (d1_smooth < 0) &
+ (z < -1.0)
+ )
+
+ momentum_short = d1.rolling(int(ema_period / 2)).mean()
+ momentum_long = d1.rolling(ema_period * 2).mean()
+
+ dataframe[f"{name}{suffixe}_trend_change_up"] = (
+ (momentum_short.shift(1) < momentum_long.shift(1)) &
+ (momentum_short > momentum_long)
+ )
+
+ dataframe[f"{name}{suffixe}_trend_change_down"] = (
+ (momentum_short.shift(1) > momentum_long.shift(1)) &
+ (momentum_short < momentum_long)
+ )
+
+ return dataframe
\ No newline at end of file