Skip to content

Commit

Permalink
fix(rust): fix the order and position initial snapshot as a batch fee…
Browse files Browse the repository at this point in the history
…d to ensure it is not processed separately.
  • Loading branch information
nkaz001 committed Oct 3, 2024
1 parent 77e752a commit fdcaf2c
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 186 deletions.
1 change: 1 addition & 0 deletions connector/src/binancefutures/market_data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl MarketDataStream {

match parse_depth(data.bids, data.asks) {
Ok((bids, asks)) => {
// todo: It should be handled as a batch feed.
for (px, qty) in bids {
self.ev_tx
.send(PublishMessage::LiveEvent(LiveEvent::Feed {
Expand Down
36 changes: 21 additions & 15 deletions connector/src/binancefutures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ impl Connector for BinanceFutures {
})
.unwrap();
} else {
// Sends the empty LiveEventsWithId to notify the end of batch.
ev_tx
.send(PublishMessage::LiveEventsWithId {
id,
events: Vec::with_capacity(0),
})
.unwrap();

symbols.insert(symbol.clone());
self.symbol_tx.send(symbol).unwrap();
}
Expand Down Expand Up @@ -262,7 +270,7 @@ impl Connector for BinanceFutures {
if let Some(order) = order_manager
.lock()
.unwrap()
.update_submit_success(symbol.clone(), order, resp)
.update_from_rest(&client_order_id, &resp)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
symbol,
Expand All @@ -272,12 +280,11 @@ impl Connector for BinanceFutures {
}
}
Err(error) => {
if let Some(order) = order_manager.lock().unwrap().update_submit_fail(
symbol.clone(),
order,
&error,
client_order_id,
) {
if let Some(order) = order_manager
.lock()
.unwrap()
.update_submit_fail(&client_order_id, &error)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
symbol,
order,
Expand Down Expand Up @@ -328,7 +335,7 @@ impl Connector for BinanceFutures {
if let Some(order) = order_manager
.lock()
.unwrap()
.update_cancel_success(symbol.clone(), order, resp)
.update_from_rest(&client_order_id, &resp)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
symbol,
Expand All @@ -338,12 +345,11 @@ impl Connector for BinanceFutures {
}
}
Err(error) => {
if let Some(order) = order_manager.lock().unwrap().update_cancel_fail(
symbol.clone(),
order,
&error,
client_order_id,
) {
if let Some(order) = order_manager
.lock()
.unwrap()
.update_cancel_fail(&client_order_id, &error)
{
tx.send(PublishMessage::LiveEvent(LiveEvent::Order {
symbol,
order,
Expand All @@ -359,7 +365,7 @@ impl Connector for BinanceFutures {
}
}
None => {
debug!(
warn!(
order_id = order.order_id,
"client_order_id corresponding to order_id is not found; \
this may be due to the order already being canceled or filled."
Expand Down
Loading

0 comments on commit fdcaf2c

Please sign in to comment.