Skip to content

Commit

Permalink
refactor(rust): refactor Iceoryx builder.
Browse files Browse the repository at this point in the history
  • Loading branch information
nkaz001 committed Oct 1, 2024
1 parent e4eeef4 commit 2e51999
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 55 deletions.
6 changes: 3 additions & 3 deletions connector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use clap::Parser;
use hftbacktest::{
live::ipc::{IceoryxReceiver, IceoryxSender, PubSubError, TO_ALL},
live::ipc::{IceoryxBuilder, IceoryxReceiver, IceoryxSender, PubSubError, TO_ALL},
prelude::*,
types::Request,
};
Expand Down Expand Up @@ -48,7 +48,7 @@ fn run_receive_task(
let node = NodeBuilder::new()
.create::<ipc::Service>()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
let bot_rx = IceoryxReceiver::<Request>::build(&format!("{name}/FromBot"))?;
let bot_rx = IceoryxBuilder::new(name).bot(false).receiver()?;
loop {
let cycle_time = Duration::from_nanos(1000);
match node.wait(cycle_time) {
Expand Down Expand Up @@ -100,7 +100,7 @@ async fn run_publish_task(
) -> Result<(), PubSubError> {
let mut depth = HashMap::new();
let mut position = HashMap::new();
let bot_tx = IceoryxSender::<LiveEvent>::build(&format!("{name}/ToBot"))?;
let bot_tx = IceoryxBuilder::new(name).bot(false).sender()?;

while let Some(msg) = rx.recv().await {
match msg {
Expand Down
146 changes: 94 additions & 52 deletions hftbacktest/src/live/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use thiserror::Error;

use crate::{
live::{BotError, Channel},
prelude::{LiveEvent, Request},
prelude::{BuildError, LiveEvent, Request},
};

pub const TO_ALL: u64 = 0;
Expand Down Expand Up @@ -53,46 +53,116 @@ pub enum PubSubError {
FromUtf8(#[from] FromUtf8Error),
}

pub struct IceoryxSender<T> {
// Unfortunately, the publisher's lifetime seems to be tied to the factory.
_pub_factory: PortFactory<ipc::Service, [u8], CustomHeader>,
publisher: Publisher<ipc::Service, [u8], CustomHeader>,
_t_marker: PhantomData<T>,
pub struct IceoryxBuilder {
name: String,
bot: bool,
}

impl<T> IceoryxSender<T>
where
T: Encode,
{
pub fn build(name: &str) -> Result<Self, PubSubError> {
impl IceoryxBuilder {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
bot: true,
}
}

pub fn bot(self, bot: bool) -> Self {
Self { bot, ..self }
}

pub fn receiver<T>(self) -> Result<IceoryxReceiver<T>, PubSubError> {
let node = NodeBuilder::new()
.create::<ipc::Service>()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
let from_bot =
ServiceName::new(name).map_err(|error| PubSubError::BuildError(error.to_string()))?;
let pub_factory = node
.service_builder(&from_bot)
.publish_subscribe::<[u8]>()
.subscriber_max_buffer_size(100000)
.max_publishers(500)
.max_subscribers(500)
.user_header::<CustomHeader>()
.open_or_create()
let sub_factory = if self.bot {
let service_name = ServiceName::new(&format!("{}/ToBot", self.name))
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
node.service_builder(&service_name)
.publish_subscribe::<[u8]>()
.subscriber_max_buffer_size(100000)
.max_publishers(1)
.max_subscribers(500)
.user_header::<CustomHeader>()
.open_or_create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?
} else {
let service_name = ServiceName::new(&format!("{}/FromBot", self.name))
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
node.service_builder(&service_name)
.publish_subscribe::<[u8]>()
.subscriber_max_buffer_size(100000)
.max_publishers(500)
.max_subscribers(1)
.user_header::<CustomHeader>()
.open_or_create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?
};

let subscriber = sub_factory
.subscriber_builder()
.create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;

Ok(IceoryxReceiver {
_sub_factory: sub_factory,
subscriber,
_t_marker: Default::default(),
})
}

pub fn sender<T>(self) -> Result<IceoryxSender<T>, PubSubError> {
let node = NodeBuilder::new()
.create::<ipc::Service>()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
let pub_factory = if self.bot {
let service_name = ServiceName::new(&format!("{}/FromBot", self.name))
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
node.service_builder(&service_name)
.publish_subscribe::<[u8]>()
.subscriber_max_buffer_size(100000)
.max_publishers(500)
.max_subscribers(1)
.user_header::<CustomHeader>()
.open_or_create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?
} else {
let service_name = ServiceName::new(&format!("{}/ToBot", self.name))
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
node.service_builder(&service_name)
.publish_subscribe::<[u8]>()
.subscriber_max_buffer_size(100000)
.max_publishers(1)
.max_subscribers(500)
.user_header::<CustomHeader>()
.open_or_create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?
};

let publisher = pub_factory
.publisher_builder()
.max_slice_len(128)
.create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;

Ok(Self {
Ok(IceoryxSender {
_pub_factory: pub_factory,
publisher,
_t_marker: Default::default(),
})
}
}

pub struct IceoryxSender<T> {
// Unfortunately, the publisher's lifetime seems to be tied to the factory.
_pub_factory: PortFactory<ipc::Service, [u8], CustomHeader>,
publisher: Publisher<ipc::Service, [u8], CustomHeader>,
_t_marker: PhantomData<T>,
}

impl<T> IceoryxSender<T>
where
T: Encode,
{
pub fn send(&self, id: u64, data: &T) -> Result<(), PubSubError> {
let sample = self.publisher.loan_slice_uninit(128)?;
let mut sample = unsafe { sample.assume_init() };
Expand Down Expand Up @@ -120,34 +190,6 @@ impl<T> IceoryxReceiver<T>
where
T: Decode,
{
pub fn build(name: &str) -> Result<Self, PubSubError> {
let node = NodeBuilder::new()
.create::<ipc::Service>()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;
let to_bot =
ServiceName::new(name).map_err(|error| PubSubError::BuildError(error.to_string()))?;
let sub_factory = node
.service_builder(&to_bot)
.publish_subscribe::<[u8]>()
.subscriber_max_buffer_size(100000)
.max_publishers(500)
.max_subscribers(500)
.user_header::<CustomHeader>()
.open_or_create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;

let subscriber = sub_factory
.subscriber_builder()
.create()
.map_err(|error| PubSubError::BuildError(error.to_string()))?;

Ok(Self {
_sub_factory: sub_factory,
subscriber,
_t_marker: Default::default(),
})
}

pub fn receive(&self) -> Result<Option<(u64, T)>, PubSubError> {
match self.subscriber.receive()? {
None => Ok(None),
Expand Down Expand Up @@ -175,8 +217,8 @@ where
R: Decode,
{
pub fn new(name: &str) -> Result<Self, anyhow::Error> {
let publisher = IceoryxSender::build(&format!("{name}/FromBot"))?;
let subscriber = IceoryxReceiver::build(&format!("{name}/ToBot"))?;
let publisher = IceoryxBuilder::new(name).sender()?;
let subscriber = IceoryxBuilder::new(name).receiver()?;

Ok(Self {
publisher,
Expand Down

0 comments on commit 2e51999

Please sign in to comment.