Skip to content

Commit

Permalink
Moving queue size and making node flume queue bigger (#724)
Browse files Browse the repository at this point in the history
Moving queue size to node fixes long latency issues for python node and
make it possible to set the right queue_size.

I also changed flume queue length within nodes to makes this possible. 

This seems to fix some of the graceful stop issue in Python but further
investigation is required.
  • Loading branch information
haixuanTao authored Dec 6, 2024
2 parents 4c317e1 + c8a8b05 commit 8ad81eb
Show file tree
Hide file tree
Showing 26 changed files with 345 additions and 78 deletions.
17 changes: 13 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -322,28 +322,37 @@ jobs:
sleep 10
dora stop --name ci-python-test --grace-duration 5s
cd ..
# Run Python Node Example
dora build ../examples/python-dataflow/dataflow.yml
dora start ../examples/python-dataflow/dataflow.yml --name ci-python --detach
dora build examples/python-dataflow/dataflow.yml
dora start examples/python-dataflow/dataflow.yml --name ci-python --detach
sleep 80
dora stop --name ci-python --grace-duration 5s
# Run Python Dynamic Node Example
dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach
dora start examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic --detach
opencv-plot --name plot
sleep 80
dora stop --name ci-python-dynamic --grace-duration 5s
# Run Python Operator Example
dora start ../examples/python-operator-dataflow/dataflow.yml --name ci-python-operator --detach
dora start examples/python-operator-dataflow/dataflow.yml --name ci-python-operator --detach
sleep 80
dora stop --name ci-python-operator --grace-duration 5s
dora destroy
# Run Python queue latency test
dora run tests/queue_size_latest_data_python/dataflow.yaml
# Run Rust queue latency test
dora build tests/queue_size_latest_data_rust/dataflow.yaml
dora run tests/queue_size_latest_data_rust/dataflow.yaml
- name: "Test CLI (C)"
timeout-minutes: 30
# fail-fast by using bash shell explictly
Expand Down
13 changes: 11 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ members = [
"libraries/extensions/ros2-bridge",
"libraries/extensions/ros2-bridge/msg-gen",
"libraries/extensions/ros2-bridge/python",
"tests/queue_size_latest_data_rust/receive_data",
]

[workspace.package]
Expand Down
70 changes: 65 additions & 5 deletions apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::{sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, HashMap, VecDeque},
sync::Arc,
time::Duration,
};

use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
id::DataId,
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
Expand All @@ -11,17 +16,22 @@ use futures::{
Stream, StreamExt,
};
use futures_timer::Delay;
use scheduler::{Scheduler, NON_INPUT_EVENT};

use self::{
event::SharedMemoryData,
thread::{EventItem, EventStreamThreadHandle},
};
use crate::daemon_connection::DaemonChannel;
use dora_core::{config::NodeId, uhlc};
use dora_core::{
config::{Input, NodeId},
uhlc,
};
use eyre::{eyre, Context};

mod event;
pub mod merged;
mod scheduler;
mod thread;

pub struct EventStream {
Expand All @@ -30,6 +40,7 @@ pub struct EventStream {
_thread_handle: EventStreamThreadHandle,
close_channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
scheduler: Scheduler,
}

impl EventStream {
Expand All @@ -38,6 +49,7 @@ impl EventStream {
dataflow_id: DataflowId,
node_id: &NodeId,
daemon_communication: &DaemonCommunication,
input_config: BTreeMap<DataId, Input>,
clock: Arc<uhlc::HLC>,
) -> eyre::Result<Self> {
let channel = match daemon_communication {
Expand Down Expand Up @@ -76,7 +88,31 @@ impl EventStream {
}
};

Self::init_on_channel(dataflow_id, node_id, channel, close_channel, clock)
let mut queue_size_limit: HashMap<DataId, (usize, VecDeque<EventItem>)> = input_config
.iter()
.map(|(input, config)| {
(
input.clone(),
(config.queue_size.unwrap_or(1), VecDeque::new()),
)
})
.collect();

queue_size_limit.insert(
DataId::from(NON_INPUT_EVENT.to_string()),
(1_000, VecDeque::new()),
);

let scheduler = Scheduler::new(queue_size_limit);

Self::init_on_channel(
dataflow_id,
node_id,
channel,
close_channel,
clock,
scheduler,
)
}

pub(crate) fn init_on_channel(
Expand All @@ -85,6 +121,7 @@ impl EventStream {
mut channel: DaemonChannel,
mut close_channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
scheduler: Scheduler,
) -> eyre::Result<Self> {
channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;
let reply = channel
Expand All @@ -105,7 +142,8 @@ impl EventStream {

close_channel.register(dataflow_id, node_id.clone(), clock.new_timestamp())?;

let (tx, rx) = flume::bounded(0);
let (tx, rx) = flume::bounded(100_000_000);

let thread_handle = thread::init(node_id.clone(), tx, channel, clock.clone())?;

Ok(EventStream {
Expand All @@ -114,6 +152,7 @@ impl EventStream {
_thread_handle: thread_handle,
close_channel,
clock,
scheduler,
})
}

Expand All @@ -128,7 +167,28 @@ impl EventStream {
}

pub async fn recv_async(&mut self) -> Option<Event> {
self.receiver.next().await.map(Self::convert_event_item)
loop {
if self.scheduler.is_empty() {
if let Some(event) = self.receiver.next().await {
self.scheduler.add_event(event);
} else {
break;
}
} else {
match select(
Delay::new(Duration::from_nanos(10_000)),
self.receiver.next(),
)
.await
{
Either::Left((_elapsed, _)) => break,
Either::Right((Some(event), _)) => self.scheduler.add_event(event),
Either::Right((None, _)) => break,
};
}
}
let event = self.scheduler.next();
event.map(Self::convert_event_item)
}

pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option<Event> {
Expand Down
97 changes: 97 additions & 0 deletions apis/rust/node/src/event_stream/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::collections::{HashMap, VecDeque};

use dora_message::{daemon_to_node::NodeEvent, id::DataId};

use super::thread::EventItem;
pub const NON_INPUT_EVENT: &str = "dora/non_input_event";

// This scheduler will make sure that there is fairness between
// inputs.
//
// It is going to always make sure that the input that has not been used for the longest period is the first one to be used next.
//
// Ex:
// In the case that one input has a very high frequency and another one with a very slow frequency.\
//
// The Node will always alternate between the two inputs when each input is available
// Avoiding one input to be overwhelmingly present.
//
#[derive(Debug)]
pub struct Scheduler {
last_used: VecDeque<DataId>, // Tracks the last-used event ID
event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>, // Tracks events per ID
}

impl Scheduler {
pub fn new(event_queues: HashMap<DataId, (usize, VecDeque<EventItem>)>) -> Self {
let topic = VecDeque::from_iter(
event_queues

Check warning on line 28 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Clippy

useless conversion to the same type: `std::collections::hash_map::Keys<'_, dora_core::config::DataId, (usize, std::collections::VecDeque<event_stream::thread::EventItem>)>`
.keys()
.into_iter()
.filter(|t| **t != DataId::from(NON_INPUT_EVENT.to_string()))
.cloned(),
);
Self {
last_used: topic,
event_queues,
}
}

pub fn add_event(&mut self, event: EventItem) {
let event_id = match &event {
EventItem::NodeEvent {
event:
NodeEvent::Input {
id,
metadata: _,
data: _,
},
ack_channel: _,
} => id,
_ => &DataId::from(NON_INPUT_EVENT.to_string()),
};

// Enforce queue size limit
if let Some((size, queue)) = self.event_queues.get_mut(event_id) {
// Remove the oldest event if at limit
if &queue.len() >= size {
queue.pop_front();
}
queue.push_back(event);
} else {
unimplemented!("Received an event that was not in the definition event id description.")
}
}

pub fn next(&mut self) -> Option<EventItem> {
// Retrieve message from the non input event first that have priority over input message.
if let Some((_size, queue)) = self
.event_queues
.get_mut(&DataId::from(NON_INPUT_EVENT.to_string()))
{
if let Some(event) = queue.pop_front() {
return Some(event);
}
}

// Process the ID with the oldest timestamp using BTreeMap Ordering
for (index, id) in self.last_used.clone().iter().enumerate() {
if let Some((_size, queue)) = self.event_queues.get_mut(id) {
if let Some(event) = queue.pop_front() {
// Put last used at last
self.last_used.remove(index);
self.last_used.push_back(id.clone());
return Some(event);
}
}
}

None
}

pub fn is_empty(&self) -> bool {
self.event_queues
.iter()
.all(|(_id, (_size, queue))| queue.is_empty())
}
}
11 changes: 9 additions & 2 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,14 @@ impl Drop for EventStreamThreadHandle {
if self.handle.is_empty() {
tracing::trace!("waiting for event stream thread");
}
match self.handle.recv_timeout(Duration::from_secs(20)) {

// TODO: The event stream duration has been shorten due to
// Python Reference Counting not working properly and deleting the node
// before deleting event creating a race condition.
//
// In the future, we hope to fix this issue so that
// the event stream can be properly waited for every time.
match self.handle.recv_timeout(Duration::from_secs(1)) {
Ok(Ok(())) => {
tracing::trace!("event stream thread finished");
}
Expand Down Expand Up @@ -230,7 +237,7 @@ fn report_remaining_drop_tokens(
drop_tokens.push(token);
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_secs(30);
let duration = Duration::from_secs(1);
if since.elapsed() > duration {
tracing::warn!(
"timeout: node finished, but token {token:?} was still not \
Expand Down
18 changes: 12 additions & 6 deletions apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,16 @@ impl DoraNode {
dynamic: _,
} = node_config;
let clock = Arc::new(uhlc::HLC::default());
let input_config = run_config.inputs.clone();

let event_stream =
EventStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
.wrap_err("failed to init event stream")?;
let event_stream = EventStream::init(
dataflow_id,
&node_id,
&daemon_communication,
input_config,
clock.clone(),
)
.wrap_err("failed to init event stream")?;
let drop_stream =
DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
.wrap_err("failed to init drop stream")?;
Expand Down Expand Up @@ -403,22 +409,22 @@ impl Drop for DoraNode {
);
}

match self.drop_stream.recv_timeout(Duration::from_secs(10)) {
match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}
Err(flume::RecvTimeoutError::Disconnected) => {
tracing::warn!(
"finished_drop_tokens channel closed while still waiting for drop tokens; \
closing {} shared memory regions that might still be used",
closing {} shared memory regions that might not yet been mapped.",
self.sent_out_shared_memory.len()
);
break;
}
Err(flume::RecvTimeoutError::Timeout) => {
tracing::warn!(
"timeout while waiting for drop tokens; \
closing {} shared memory regions that might still be used",
closing {} shared memory regions that might not yet been mapped.",
self.sent_out_shared_memory.len()
);
break;
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ impl Daemon {
let _ = reply_sender.send(DaemonReply::Result(Err(err)));
}
Ok(dataflow) => {
tracing::debug!("node `{node_id}` is ready");
tracing::info!("node `{node_id}` is ready");
Self::subscribe(dataflow, node_id.clone(), event_sender, &self.clock).await;

let status = dataflow
Expand Down
Loading

0 comments on commit 8ad81eb

Please sign in to comment.