-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmarket_maker.py
More file actions
3126 lines (2569 loc) · 128 KB
/
market_maker.py
File metadata and controls
3126 lines (2569 loc) · 128 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import asyncio
import argparse
import logging
from logging.handlers import RotatingFileHandler
import orjson
import websockets
import json
import signal
import time
import math
from collections import deque
from dataclasses import dataclass
from decimal import Decimal, ROUND_DOWN
from typing import Optional
import numpy as np
import requests
from api_client import ApiClient
from utils import configured_symbol, load_project_env, normalize_symbol_base
load_project_env()
def env_flag(name, default):
"""Parse a boolean environment flag with a sane default."""
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() not in {"0", "false", "no", "off"}
# --- Configuration ---
# STRATEGY
DEFAULT_SYMBOL = configured_symbol()
FLIP_MODE = False # True for short-biased (SELL first), False for long-biased (BUY first)
DEFAULT_BUY_SPREAD = 0.006 # 0.6% below mid-price for buy orders
DEFAULT_SELL_SPREAD = 0.006 # 0.6% above mid-price for sell orders
USE_AVELLANEDA_SPREADS = True # Toggle to pull spreads from Avellaneda parameter files
DEFAULT_LEVERAGE = 1
DEFAULT_BALANCE_FRACTION = 0.2 # Use a fraction of tracked wallet balance for each order
POSITION_THRESHOLD_USD = 15.0 # USD threshold before a position is treated as significant inventory
# TIMING (in seconds)
ORDER_REFRESH_INTERVAL = 60 # Safety lifetime for a working order before a forced refresh, in seconds.
RETRY_ON_ERROR_INTERVAL = 30 # How long to wait after a major error before retrying.
PRICE_REPORT_INTERVAL = 60 # How often to report current prices and spread to terminal.
BALANCE_REPORT_INTERVAL = 60 # How often to report account balance to terminal.
POSITION_SYNC_TIMEOUT = 2.0 # How long to wait for a position snapshot after a fill.
STARTUP_CLEANUP_TIMEOUT = 20.0 # How long to wait for the initial cancel-all cleanup.
CANCEL_CONFIRM_TIMEOUT = 5.0 # How long to wait for a terminal update after canceling a timed-out order.
WEBSOCKET_MAX_CONNECTION_AGE = 23 * 60 * 60 # Rotate websocket connections before the documented 24h server limit.
SHUTDOWN_ACTIVE_ORDER_GRACE_TIMEOUT = 8.0
SHUTDOWN_CANCEL_ALL_TIMEOUT = 20.0
SHUTDOWN_CANCEL_ALL_RETRIES = 2
# ORDER REUSE SETTINGS
DEFAULT_PRICE_CHANGE_THRESHOLD_BPS = 5.0 # Minimum price move required before replacing an order
DEFAULT_PRICE_CHANGE_THRESHOLD = DEFAULT_PRICE_CHANGE_THRESHOLD_BPS / 10000.0
OPENING_CAPITAL_BUFFER_MULTIPLIER = 1.25 # Safety headroom above the exchange minimum for opening orders.
ORDER_FAILURE_WINDOW_SECONDS = 60.0
ORDER_FAILURE_LIMIT = 3
OPENING_CIRCUIT_BREAKER_COOLDOWN = 120.0
# SUPERTREND INTEGRATION
USE_SUPERTREND_SIGNAL = True # Toggle to use Supertrend signal for dynamic flip_mode
SUPERTREND_PARAMS_TEMPLATE = "supertrend_params_{}.json"
SUPERTREND_CHECK_INTERVAL = 600 # Seconds between checking the signal file
USE_BINANCE_OBI_ALPHA = True
BINANCE_OBI_LOOKING_DEPTH_PCT = 0.025
BINANCE_OBI_BOOK_RETAIN_PCT = 0.03
BINANCE_OBI_ZSCORE_WINDOW_SECONDS = 600
BINANCE_OBI_WARMUP_SECONDS = 300
BINANCE_OBI_BUFFER_CAPACITY = 8192
BINANCE_OBI_MIN_SAMPLES = 100
BINANCE_OBI_STALE_TIMEOUT_SECONDS = 5.0
BINANCE_OBI_BPS_PER_SIGMA = 5.0
BINANCE_OBI_MAX_SHIFT_BPS = 15.0
BINANCE_OBI_SHIFT_LOG_DELTA_BPS = 1.0
BINANCE_OBI_TRIM_INTERVAL_SECONDS = 1.0
BINANCE_OBI_TRIM_INTERVAL_UPDATES = 10
BINANCE_OBI_BAND_REBUILD_BPS = 1.0
# ORDER CANCELLATION
CANCEL_SPECIFIC_ORDER = True # If True, cancel specific order ID. If False, cancel all orders for the symbol.
ORDER_REPLACE_MODE = os.getenv("ORDER_REPLACE_MODE", "fast").strip().lower()
FAST_ORDER_REPLACE = ORDER_REPLACE_MODE == "fast"
OPEN_ORDER_WATCHDOG_INTERVAL = 15.0
OPEN_ORDER_WATCHDOG_STALE_GRACE = 5.0
OPEN_ORDER_WATCHDOG_CANCEL_ALL = True
QUOTE_REFRESH_PREFILTER_BPS = 2.0
# LOGGING
LOG_FILE = 'market_maker.log'
RELEASE_MODE = env_flag("RELEASE_MODE", True) # When True, suppress all non-error logs and prints
MIN_ORDER_INTERVAL = 1.0 # Minimum seconds between order placements
POSITION_SIZE_EPSILON = 1e-12
# Spread configuration
PARAMS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "params")
AVELLANEDA_FILE_PREFIX = "avellaneda_parameters_"
DEFAULT_MIN_AVELLANEDA_SPREAD_BPS = 5.0
DEFAULT_MAX_AVELLANEDA_SPREAD_BPS = 200.0
SPREAD_CACHE_TTL_SECONDS = 10
_SPREAD_CACHE = {}
class BinanceOrderBookSyncError(Exception):
"""Raised when the Binance local book must be resynchronized."""
class RollingZScoreBuffer:
"""Fixed-capacity, time-evicting ring buffer for rolling z-score stats."""
def __init__(self, capacity: int):
self.capacity = max(int(capacity), 1)
self.timestamps_ms = np.zeros(self.capacity, dtype=np.int64)
self.values = np.zeros(self.capacity, dtype=np.float64)
self.head = 0
self.tail = 0
self.count = 0
self.sum = 0.0
self.sum_sq = 0.0
def clear(self):
self.head = 0
self.tail = 0
self.count = 0
self.sum = 0.0
self.sum_sq = 0.0
def _drop_oldest(self):
if self.count == 0:
return
value = float(self.values[self.head])
self.sum -= value
self.sum_sq -= value * value
self.head = (self.head + 1) % self.capacity
self.count -= 1
def evict_older_than(self, cutoff_ms: int):
while self.count > 0 and int(self.timestamps_ms[self.head]) < int(cutoff_ms):
self._drop_oldest()
def append(self, timestamp_ms: int, value: float):
if self.count == self.capacity:
self._drop_oldest()
self.timestamps_ms[self.tail] = int(timestamp_ms)
self.values[self.tail] = float(value)
self.tail = (self.tail + 1) % self.capacity
self.count += 1
self.sum += float(value)
self.sum_sq += float(value) * float(value)
def mean(self):
if self.count <= 0:
return None
return self.sum / self.count
def std(self):
if self.count <= 1:
return None
mean = self.sum / self.count
variance = max((self.sum_sq / self.count) - (mean * mean), 0.0)
return math.sqrt(variance)
def span_seconds(self):
if self.count <= 1:
return 0.0
last_index = (self.tail - 1) % self.capacity
return max(0.0, (int(self.timestamps_ms[last_index]) - int(self.timestamps_ms[self.head])) / 1000.0)
@dataclass(frozen=True)
class AsterTopOfBookSnapshot:
bid_price: float
ask_price: float
mid_price: float
updated_at: float
@dataclass(frozen=True)
class BinanceAlphaSnapshot:
ready: bool = False
raw_imbalance: Optional[float] = None
zscore: Optional[float] = None
shift_bps: float = 0.0
warmup_seconds: float = 0.0
sample_count: int = 0
best_bid: Optional[float] = None
best_ask: Optional[float] = None
last_updated: Optional[float] = None
ws_connected: bool = False
@dataclass(frozen=True)
class PreparedQuoteParams:
kind: str
source: str
buy_spread: float = 0.0
sell_spread: float = 0.0
gamma: float = 0.0
sigma: float = 0.0
k_buy: float = 0.0
k_sell: float = 0.0
time_horizon_days: float = 0.0
spread_limits_min_bps: float = DEFAULT_MIN_AVELLANEDA_SPREAD_BPS
spread_limits_max_bps: float = DEFAULT_MAX_AVELLANEDA_SPREAD_BPS
@dataclass(frozen=True)
class PendingTerminalOrder:
side: str
reduce_only: bool
position_update_seq_before_fill: int
order_label: str
cancel_requested_at: float
@dataclass(frozen=True)
class OrderCommand:
"""The latest desired action for the order manager."""
kind: str
side: str = ""
reduce_only: bool = False
price: float = 0.0
quantity: float = 0.0
formatted_price: str = ""
formatted_quantity: str = ""
order_notional: float = 0.0
trigger: str = ""
def get_unavailable_quote_params():
"""Return a sentinel payload meaning the bot must not quote yet."""
return {"source": "unavailable"}
def get_unavailable_prepared_quote_params():
"""Return the immutable unavailable quote snapshot used by the hot path."""
return PreparedQuoteParams(kind="unavailable", source="unavailable")
def prepare_quote_params_snapshot(params):
"""Validate and normalize quote params once so the hot path stays memory-only."""
if not isinstance(params, dict):
return get_unavailable_prepared_quote_params()
source = str(params.get("source") or "unavailable")
if source == "unavailable":
return get_unavailable_prepared_quote_params()
if source == "default":
buy_spread = _safe_float(params.get("buy_spread"))
sell_spread = _safe_float(params.get("sell_spread"))
if buy_spread is None or sell_spread is None or buy_spread <= 0.0 or sell_spread <= 0.0:
return get_unavailable_prepared_quote_params()
return PreparedQuoteParams(
kind="default",
source=source,
buy_spread=buy_spread,
sell_spread=sell_spread,
)
spread_limits = resolve_avellaneda_spread_limits_bps(params)
gamma = _safe_float(params.get("gamma"))
sigma = _safe_float(params.get("sigma"))
k_buy = _safe_float(params.get("k_buy"))
k_sell = _safe_float(params.get("k_sell"))
time_horizon_days = _safe_float(params.get("time_horizon_days"))
required_positive = (gamma, sigma, k_buy, k_sell, time_horizon_days)
if any(value is None or value <= 0.0 for value in required_positive):
return get_unavailable_prepared_quote_params()
return PreparedQuoteParams(
kind="avellaneda",
source=source,
gamma=gamma,
sigma=sigma,
k_buy=k_buy,
k_sell=k_sell,
time_horizon_days=time_horizon_days,
spread_limits_min_bps=spread_limits["min"],
spread_limits_max_bps=spread_limits["max"],
)
def publish_binance_alpha_snapshot(state):
"""Mirror mutable Binance alpha fields into a single immutable snapshot."""
state.binance_alpha_snapshot = BinanceAlphaSnapshot(
ready=bool(state.binance_alpha_ready),
raw_imbalance=state.binance_alpha_raw_imbalance,
zscore=state.binance_alpha_zscore,
shift_bps=float(state.binance_alpha_shift_bps or 0.0),
warmup_seconds=float(state.binance_alpha_warmup_seconds or 0.0),
sample_count=int(state.binance_alpha_buffer.count),
best_bid=state.binance_best_bid,
best_ask=state.binance_best_ask,
last_updated=state.binance_alpha_last_updated,
ws_connected=bool(state.binance_alpha_ws_connected),
)
def resolve_symbol(cli_symbol=None):
"""Resolve the active symbol from CLI input or the single runtime config source."""
symbol = cli_symbol or configured_symbol(DEFAULT_SYMBOL)
return symbol.upper()
class RuntimeContext:
"""Holds runtime-only state such as timing and shutdown signals."""
def __init__(self, symbol, clock=None):
self.symbol = resolve_symbol(symbol)
self.shutdown_requested = False
self.price_last_updated = None
self.last_order_time = 0.0
self._clock = clock
def now(self):
if self._clock is not None:
return self._clock()
try:
return asyncio.get_running_loop().time()
except RuntimeError:
return time.monotonic()
def request_shutdown(self):
self.shutdown_requested = True
def setup_logging(file_log_level):
"""Configures logging to both console (INFO) and file (specified level)."""
log_level = getattr(logging, file_log_level.upper(), logging.DEBUG)
logger = logging.getLogger() # Get root logger
if RELEASE_MODE:
logger.setLevel(logging.ERROR) # Only errors in release mode
else:
logger.setLevel(log_level)
if logger.hasHandlers():
logger.handlers.clear()
# File handler - bounded rotation so multi-week runs don't consume the disk.
file_handler = RotatingFileHandler(LOG_FILE, maxBytes=50_000_000, backupCount=5)
file_handler.setLevel(log_level if not RELEASE_MODE else logging.ERROR)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Console handler - only add if not in release mode or for errors
if not RELEASE_MODE:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
else:
# In release mode, only show errors on console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class StrategyState:
"""A simple class to hold the shared state of the strategy."""
def __init__(self, flip_mode=False):
self.bid_price = None
self.ask_price = None
self.mid_price = None
self.active_order_id = None
self.position_size = 0.0
# Mode can be 'BUY' or 'SELL'
self.flip_mode = flip_mode
self.mode = 'SELL' if self.flip_mode else 'BUY'
# Track last order details for reuse logic
self.last_order_price = None
self.last_order_side = None
self.last_order_quantity = None
# Account balance tracking
self.account_balance = None # Total USDF + USDT + USDC balance
self.balance_last_updated = None
self.balance_listen_key = None
self.usdf_balance = 0.0
self.usdt_balance = 0.0
self.usdc_balance = 0.0
# Queue for order updates from WebSocket
self.order_updates = asyncio.Queue()
# Latest-wins handoff from quote engine to order manager
self.order_commands = asyncio.Queue(maxsize=1)
self.quote_refresh_event = asyncio.Event()
self.aster_top_of_book_snapshot = None
self.quote_params = (
{"buy_spread": DEFAULT_BUY_SPREAD, "sell_spread": DEFAULT_SELL_SPREAD, "source": "default"}
if not USE_AVELLANEDA_SPREADS
else get_unavailable_quote_params()
)
self.prepared_quote_params = PreparedQuoteParams(
kind="default",
source="default",
buy_spread=DEFAULT_BUY_SPREAD,
sell_spread=DEFAULT_SELL_SPREAD,
) if not USE_AVELLANEDA_SPREADS else PreparedQuoteParams(kind="unavailable", source="unavailable")
# WebSocket connection health flags
self.price_ws_connected = False
self.user_data_ws_connected = False
self.symbol_filters = None
# Supertrend signal
self.supertrend_signal = None # Can be 1 (up) or -1 (down)
# Position snapshots are the source of truth for inventory state
self.position_update_seq = 0
self.active_order_started_at = None
self.order_failure_timestamps = deque()
self.opening_circuit_breaker_until = 0.0
# Binance order book imbalance alpha state
self.binance_bid_book = {}
self.binance_ask_book = {}
self.binance_last_update_id = None
self.binance_alpha_buffer = RollingZScoreBuffer(BINANCE_OBI_BUFFER_CAPACITY)
self.binance_alpha_ws_connected = False
self.binance_alpha_ready = False
self.binance_alpha_last_updated = None
self.binance_alpha_raw_imbalance = None
self.binance_alpha_zscore = None
self.binance_alpha_shift_bps = 0.0
self.binance_alpha_warmup_seconds = 0.0
self.binance_best_bid = None
self.binance_best_ask = None
self.binance_last_logged_shift_bps = None
self.binance_alpha_last_status = None
self.binance_alpha_snapshot = BinanceAlphaSnapshot()
self.binance_book_last_trim_at = 0.0
self.binance_book_updates_since_trim = 0
self.binance_band_mid_price = None
self.binance_band_lower_bound = None
self.binance_band_upper_bound = None
self.binance_band_bid_qty = 0.0
self.binance_band_ask_qty = 0.0
self.pending_terminal_orders = {}
def get_strategy_modes(flip_mode):
"""Return the opening and closing order sides for the current bias."""
opening_mode = 'SELL' if flip_mode else 'BUY'
closing_mode = 'BUY' if flip_mode else 'SELL'
return opening_mode, closing_mode
def get_position_close_side(position_size):
"""Return the side required to reduce the current position."""
if position_size > 0:
return 'SELL'
if position_size < 0:
return 'BUY'
return None
def has_open_position_size(position_size):
"""Return True when the tracked position is materially non-zero."""
return abs(position_size) > POSITION_SIZE_EPSILON
def has_open_position(state):
"""Return True when the strategy is carrying any non-zero inventory."""
return has_open_position_size(state.position_size)
def has_significant_position(state, position_notional=None):
"""Return True when inventory is non-zero and large enough to require an explicit close."""
if position_notional is None:
position_notional = get_position_notional_usd(state.position_size, state.mid_price)
return has_open_position(state) and is_position_significant(position_notional)
def get_target_mode(state, position_notional=None):
"""Return the desired trading side given current bias and tracked inventory."""
opening_mode, _ = get_strategy_modes(state.flip_mode)
if position_notional is None:
position_notional = get_position_notional_usd(state.position_size, state.mid_price)
close_side = get_position_close_side(state.position_size)
if close_side and is_position_significant(position_notional):
return close_side
return opening_mode
def clear_order_tracking(state):
"""Clear the active order id and reuse-tracking fields."""
state.active_order_id = None
state.last_order_price = None
state.last_order_side = None
state.last_order_quantity = None
state.active_order_started_at = None
def get_position_notional_usd(position_size, reference_price):
"""Estimate the USD notional of a position from a reference price."""
if reference_price is None or reference_price <= 0:
return 0.0
return abs(position_size * reference_price)
def is_position_significant(position_notional):
"""Return True when the position is large enough to force closing mode."""
return position_notional >= POSITION_THRESHOLD_USD
def apply_position_snapshot(state, position_size, position_notional=None):
"""Store the latest position size and align the mode without erasing residual inventory."""
state.position_size = position_size
state.position_update_seq += 1
return sync_mode_with_position(state, position_notional=position_notional)
def extract_position_snapshot(position_data, reference_price=None):
"""Normalize an exchange position payload into size and notional values."""
raw_size = position_data.get('positionAmt')
if raw_size is None:
raw_size = position_data.get('pa', 0.0)
position_size = float(raw_size or 0.0)
raw_notional = position_data.get('notional')
if raw_notional is not None:
return position_size, abs(float(raw_notional))
entry_price = float(position_data.get('ep', position_data.get('entryPrice', 0.0)) or 0.0)
reference = reference_price if reference_price and reference_price > 0 else entry_price
return position_size, get_position_notional_usd(position_size, reference)
def sync_state_from_position_data(state, position_data, reference_price=None):
"""Apply an exchange position payload to local state."""
previous_mode = state.mode
position_size, notional_value = extract_position_snapshot(position_data, reference_price=reference_price)
apply_position_snapshot(state, position_size, position_notional=notional_value)
return position_size, notional_value, previous_mode, state.mode
def sync_mode_with_position(state, position_notional=None):
"""Keep the strategy mode aligned with the current bias and inventory threshold."""
state.mode = get_target_mode(state, position_notional=position_notional)
return state.mode
def request_quote_refresh(state):
"""Wake the quote engine to recompute the desired working order."""
state.quote_refresh_event.set()
def publish_latest_order_command(state, command):
"""Push the latest order-manager command, replacing any stale pending one."""
while True:
try:
state.order_commands.get_nowait()
except asyncio.QueueEmpty:
break
state.order_commands.put_nowait(command)
def drain_latest_order_command(state, initial_command):
"""Collapse queued commands and return only the newest one."""
latest_command = initial_command
while True:
try:
latest_command = state.order_commands.get_nowait()
except asyncio.QueueEmpty:
return latest_command
def get_supertrend_params_path(symbol):
"""Return the normalized Supertrend params file path for a trading symbol."""
filename_symbol = normalize_symbol_base(symbol)
return os.path.join(PARAMS_DIR, SUPERTREND_PARAMS_TEMPLATE.format(filename_symbol))
def load_supertrend_signal(symbol):
"""Load the latest Supertrend signal from disk."""
params_file = get_supertrend_params_path(symbol)
if not os.path.exists(params_file):
raise FileNotFoundError(params_file)
with open(params_file, 'r', encoding='utf-8') as f:
data = json.load(f)
signal = data.get('current_signal', {}).get('trend')
if signal not in [1, -1]:
raise ValueError(f"Invalid signal '{signal}' in {params_file}")
return signal, params_file
def round_price_to_tick(price, tick_size, side):
"""Round prices to a passive tick for the given side."""
if tick_size <= 0:
raise ValueError("tick_size must be positive")
scaled = price / tick_size
if side == 'BUY':
rounded = math.floor(scaled + 1e-12) * tick_size
elif side == 'SELL':
rounded = math.ceil(scaled - 1e-12) * tick_size
else:
raise ValueError(f"Unsupported side for price rounding: {side}")
return rounded
def _binance_ws_symbol(symbol):
"""Return the lowercase Binance websocket symbol."""
return (symbol or "").lower()
def _binance_depth_stream_url(symbol):
"""Return the public Binance diff-book stream URL for the symbol."""
return f"wss://fstream.binance.com/ws/{_binance_ws_symbol(symbol)}@depth@100ms"
def _binance_depth_snapshot_url(symbol):
"""Return the Binance REST depth snapshot URL for the symbol."""
return f"https://fapi.binance.com/fapi/v1/depth?symbol={(symbol or '').upper()}&limit=1000"
def clear_binance_alpha_state(state):
"""Reset all in-memory Binance alpha state to avoid stale reuse or memory growth."""
state.binance_bid_book.clear()
state.binance_ask_book.clear()
state.binance_last_update_id = None
state.binance_alpha_buffer.clear()
state.binance_alpha_ready = False
state.binance_alpha_last_updated = None
state.binance_alpha_raw_imbalance = None
state.binance_alpha_zscore = None
state.binance_alpha_shift_bps = 0.0
state.binance_alpha_warmup_seconds = 0.0
state.binance_best_bid = None
state.binance_best_ask = None
state.binance_last_logged_shift_bps = None
state.binance_alpha_last_status = None
state.binance_book_last_trim_at = 0.0
state.binance_book_updates_since_trim = 0
state.binance_band_mid_price = None
state.binance_band_lower_bound = None
state.binance_band_upper_bound = None
state.binance_band_bid_qty = 0.0
state.binance_band_ask_qty = 0.0
publish_binance_alpha_snapshot(state)
def _refresh_binance_best_prices(state):
"""Refresh cached best prices from the current local Binance book."""
state.binance_best_bid = max(state.binance_bid_book) if state.binance_bid_book else None
state.binance_best_ask = min(state.binance_ask_book) if state.binance_ask_book else None
def _price_is_inside_binance_band(price, lower_bound, upper_bound, is_bid):
"""Return True when a price contributes to the active OBI band for its side."""
if lower_bound is None or upper_bound is None:
return False
if is_bid:
return price >= lower_bound
return price <= upper_bound
def _apply_book_updates(book, updates, current_best_price, is_bid, lower_bound=None, upper_bound=None):
"""Apply absolute-quantity Binance depth updates into a local side book."""
best_price = current_best_price
best_invalidated = False
band_delta = 0.0
for price_raw, qty_raw in updates:
price = float(price_raw)
qty = max(float(qty_raw), 0.0)
previous_qty = float(book.get(price, 0.0) or 0.0)
if _price_is_inside_binance_band(price, lower_bound, upper_bound, is_bid):
band_delta += qty - previous_qty
if qty <= 0.0:
removed = book.pop(price, None)
if removed is not None and best_price is not None and price == best_price:
best_invalidated = True
else:
book[price] = qty
if best_price is None:
best_price = price
elif is_bid and price > best_price:
best_price = price
elif not is_bid and price < best_price:
best_price = price
if best_invalidated:
if book:
best_price = max(book) if is_bid else min(book)
else:
best_price = None
return best_price, band_delta
def _trim_binance_books(state):
"""Keep the local Binance book bounded around the current mid price."""
if not state.binance_bid_book or not state.binance_ask_book:
return
if state.binance_best_bid is None or state.binance_best_ask is None:
_refresh_binance_best_prices(state)
best_bid = state.binance_best_bid
best_ask = state.binance_best_ask
if best_bid is None or best_ask is None:
return
mid_price = (best_bid + best_ask) / 2.0
lower_bound = mid_price * (1.0 - BINANCE_OBI_BOOK_RETAIN_PCT)
upper_bound = mid_price * (1.0 + BINANCE_OBI_BOOK_RETAIN_PCT)
stale_bids = [price for price in state.binance_bid_book if price < lower_bound]
stale_asks = [price for price in state.binance_ask_book if price > upper_bound]
for price in stale_bids:
state.binance_bid_book.pop(price, None)
for price in stale_asks:
state.binance_ask_book.pop(price, None)
def _rebuild_binance_band_totals(state):
"""Rebuild the cached OBI depth totals from the bounded local book."""
if state.binance_best_bid is None or state.binance_best_ask is None:
_refresh_binance_best_prices(state)
best_bid = state.binance_best_bid
best_ask = state.binance_best_ask
if best_bid is None or best_ask is None or best_bid <= 0.0 or best_ask <= 0.0 or best_bid >= best_ask:
state.binance_band_mid_price = None
state.binance_band_lower_bound = None
state.binance_band_upper_bound = None
state.binance_band_bid_qty = 0.0
state.binance_band_ask_qty = 0.0
return
mid_price = (best_bid + best_ask) / 2.0
lower_bound = mid_price * (1.0 - BINANCE_OBI_LOOKING_DEPTH_PCT)
upper_bound = mid_price * (1.0 + BINANCE_OBI_LOOKING_DEPTH_PCT)
bid_qty = 0.0
for price, qty in state.binance_bid_book.items():
if price >= lower_bound:
bid_qty += qty
ask_qty = 0.0
for price, qty in state.binance_ask_book.items():
if price <= upper_bound:
ask_qty += qty
state.binance_band_mid_price = mid_price
state.binance_band_lower_bound = lower_bound
state.binance_band_upper_bound = upper_bound
state.binance_band_bid_qty = bid_qty
state.binance_band_ask_qty = ask_qty
def _binance_band_requires_rebuild(state):
"""Return True when the current OBI totals must be rebuilt from the local book."""
if state.binance_best_bid is None or state.binance_best_ask is None:
return True
if state.binance_band_mid_price is None:
return True
current_mid = (state.binance_best_bid + state.binance_best_ask) / 2.0
if current_mid <= 0.0:
return True
drift_bps = abs(current_mid - state.binance_band_mid_price) / current_mid * 10000.0
return drift_bps >= BINANCE_OBI_BAND_REBUILD_BPS
def calculate_binance_orderbook_imbalance(state):
"""Compute normalized Binance book imbalance within +/- looking depth around mid."""
if not state.binance_bid_book or not state.binance_ask_book:
return None
if _binance_band_requires_rebuild(state):
_rebuild_binance_band_totals(state)
if state.binance_band_mid_price is None:
return None
total_qty = state.binance_band_bid_qty + state.binance_band_ask_qty
if total_qty <= 0.0:
return None
return (state.binance_band_bid_qty - state.binance_band_ask_qty) / total_qty
def calculate_binance_alpha_shift_bps(zscore):
"""Map a z-score to a capped quote shift in basis points."""
if zscore is None or not np.isfinite(zscore):
return 0.0
shift_bps = float(zscore) * BINANCE_OBI_BPS_PER_SIGMA
return max(-BINANCE_OBI_MAX_SHIFT_BPS, min(BINANCE_OBI_MAX_SHIFT_BPS, shift_bps))
def is_binance_alpha_live(state):
"""Return True when Binance alpha is warmed up and ready for opening quotes."""
if not USE_BINANCE_OBI_ALPHA:
return True
return bool(state.binance_alpha_snapshot.ready or state.binance_alpha_ready)
def binance_alpha_status_text(state):
"""Return a compact status string for logs and reporters."""
snapshot = state.binance_alpha_snapshot
if not USE_BINANCE_OBI_ALPHA:
return "Binance OBI disabled"
if snapshot.ready is False and state.binance_alpha_ready:
snapshot = BinanceAlphaSnapshot(
ready=True,
zscore=state.binance_alpha_zscore,
shift_bps=state.binance_alpha_shift_bps,
warmup_seconds=state.binance_alpha_warmup_seconds,
sample_count=state.binance_alpha_buffer.count,
best_bid=state.binance_best_bid,
best_ask=state.binance_best_ask,
last_updated=state.binance_alpha_last_updated,
ws_connected=state.binance_alpha_ws_connected,
)
if snapshot.ready:
zscore = snapshot.zscore if snapshot.zscore is not None else 0.0
return f"Binance OBI z={zscore:+.2f} shift={snapshot.shift_bps:+.1f}bps"
if snapshot.ws_connected:
return (
f"Binance OBI warming {snapshot.warmup_seconds:.0f}s/"
f"{BINANCE_OBI_WARMUP_SECONDS}s samples={snapshot.sample_count}"
)
return "Binance OBI unavailable"
return rounded
def round_quantity_to_step(quantity, step_size):
"""Round quantity down to the nearest valid multiple of step_size."""
if step_size <= 0:
raise ValueError("step_size must be positive")
if quantity <= 0:
return 0.0
quantity_dec = Decimal(str(quantity))
step_dec = Decimal(str(step_size))
steps = (quantity_dec / step_dec).to_integral_value(rounding=ROUND_DOWN)
rounded = steps * step_dec
return float(rounded)
class AsterTopOfBookFeed:
"""Publish immutable Aster top-of-book snapshots and prefilter quote refreshes."""
def publish(self, state, runtime, quote_engine, best_bid, best_ask):
mid_price = (best_bid + best_ask) / 2.0
snapshot = AsterTopOfBookSnapshot(
bid_price=best_bid,
ask_price=best_ask,
mid_price=mid_price,
updated_at=runtime.now(),
)
previous_snapshot = state.aster_top_of_book_snapshot
state.aster_top_of_book_snapshot = snapshot
state.bid_price = best_bid
state.ask_price = best_ask
state.mid_price = mid_price
runtime.price_last_updated = snapshot.updated_at
return quote_engine.should_refresh_from_top_of_book(state, previous_snapshot, snapshot)
class BinanceAlphaEngine:
"""Maintain Binance order book state and publish immutable alpha snapshots."""
def clear(self, state):
clear_binance_alpha_state(state)
def initialize_local_book(self, state, snapshot):
_initialize_binance_local_book(state, snapshot)
publish_binance_alpha_snapshot(state)
def apply_depth_event(self, state, event, require_prev_match=True):
return _apply_binance_depth_event(state, event, require_prev_match=require_prev_match)
def update_metrics(self, state, runtime):
_update_binance_alpha_metrics(state, runtime)
class QuoteEngine:
"""Own all quote decisions off immutable feed snapshots and prepared params."""
def prepare_quote_params(self, params):
return prepare_quote_params_snapshot(params)
def get_prepared_params(self, state):
return state.prepared_quote_params or get_unavailable_prepared_quote_params()
def estimate_quote_center(self, state, book_snapshot=None, prepared_params=None):
snapshot = book_snapshot or state.aster_top_of_book_snapshot
if snapshot is None:
return None
params = prepared_params or self.get_prepared_params(state)
mid_price = snapshot.mid_price
alpha_snapshot = state.binance_alpha_snapshot
alpha_shift_abs = 0.0
if alpha_snapshot.ready and mid_price and mid_price > 0.0:
alpha_shift_abs = mid_price * (alpha_snapshot.shift_bps / 10000.0)
if params.kind == "avellaneda":
risk_term = params.gamma * ((params.sigma * mid_price) ** 2) * params.time_horizon_days
return mid_price - state.position_size * risk_term + alpha_shift_abs
return mid_price + alpha_shift_abs
def should_refresh_from_top_of_book(self, state, previous_snapshot, new_snapshot):
if previous_snapshot is None or new_snapshot is None:
return True
previous_center = self.estimate_quote_center(state, book_snapshot=previous_snapshot)
new_center = self.estimate_quote_center(state, book_snapshot=new_snapshot)
if previous_center is None or new_center is None:
return True
tick_size = float((state.symbol_filters or {}).get("tick_size", 0.0) or 0.0)
center_threshold = new_center * (QUOTE_REFRESH_PREFILTER_BPS / 10000.0)
if tick_size > 0.0:
center_threshold = max(center_threshold, tick_size)
return abs(new_center - previous_center) >= center_threshold
def build_quote_command(self, state, symbol_filters):
return build_quote_command(state, symbol_filters)
async def run(self, state, client, symbol, runtime):
log = logging.getLogger('MarketMakerLoop')
log.info(f"Fetching trading rules for {symbol}...")
symbol_filters = await client.get_symbol_filters(symbol)
state.symbol_filters = symbol_filters
log.info(f"Filters loaded: {symbol_filters}")
request_quote_refresh(state)
while not runtime.shutdown_requested:
try:
await state.quote_refresh_event.wait()
state.quote_refresh_event.clear()
while True:
if runtime.shutdown_requested:
break
if symbol_filters.get("status", "TRADING") != "TRADING":
if state.active_order_id:
publish_latest_order_command(
state,
OrderCommand(kind="cancel", trigger=f"Symbol status {symbol_filters.get('status', 'UNKNOWN')}"),
)
break
if not state.price_ws_connected or not state.user_data_ws_connected:
if state.active_order_id:
publish_latest_order_command(
state,
OrderCommand(kind="cancel", trigger="WebSocket disconnection"),
)
break
if not is_price_data_valid(state, runtime) or not is_balance_data_valid(state):
break
if is_opening_circuit_breaker_active(state, runtime) and not has_open_position(state):
break
bias_changed, current_notional = apply_supertrend_bias(state)
if bias_changed:
trend_name = "DOWNTREND" if state.flip_mode else "UPTREND"
log.info(f"Supertrend switched strategy bias to {trend_name} while inventory was below ${current_notional:.2f}.")
quote_command, order_candidate = self.build_quote_command(state, symbol_filters)
if quote_command is None:
reason = order_candidate["reason"]
if reason in {"missing_quote_params", "invalid_quote_params", "binance_alpha_unavailable"}:
if state.active_order_id:
publish_latest_order_command(
state,
OrderCommand(kind="cancel", trigger=f"Quotes unavailable: {reason}"),
)
break
if reason in {"non_positive_quantity", "min_qty", "min_notional"} and state.active_order_id:
publish_latest_order_command(
state,
OrderCommand(kind="cancel", trigger=f"Quote invalid: {reason}"),
)