diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 64225ff29..e981b6cc1 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -105,7 +105,7 @@ impl TcpBuilder { index: self.index, peers: self.peers, canaries: Rc::new(RefCell::new(Vec::new())), - channel_id_bound: None, + channel_id_upper: 0, staged: Vec::new(), sends, recvs, @@ -125,7 +125,7 @@ pub struct TcpAllocator { staged: Vec, // staging area for incoming Bytes canaries: Rc>>, - channel_id_bound: Option, + channel_id_upper: usize, // sending, receiving, and responding to binary buffers. sends: Vec>>>, // sends[x] -> goes to process x. @@ -139,10 +139,8 @@ impl Allocate for TcpAllocator { fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { // Assume and enforce in-order identifier allocation. - if let Some(bound) = self.channel_id_bound { - assert!(bound < identifier); - } - self.channel_id_bound = Some(identifier); + assert!(self.channel_id_upper <= identifier); + self.channel_id_upper = identifier + 1; // Result list of boxed pushers. let mut pushes = Vec::>>>::new(); @@ -235,7 +233,7 @@ impl Allocate for TcpAllocator { match self.to_local.entry(header.channel) { Entry::Vacant(entry) => { // We may receive data before allocating, and shouldn't block. - if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) { + if self.channel_id_upper <= header.channel { entry.insert(Rc::new(RefCell::new(VecDeque::new()))) .borrow_mut() .push_back(peel); @@ -275,4 +273,4 @@ impl Allocate for TcpAllocator { fn await_events(&self, duration: Option) { self.inner.await_events(duration); } -} \ No newline at end of file +} diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index dd2815a50..91c0a7287 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -79,7 +79,7 @@ impl ProcessBuilder { peers: self.peers, events: Rc::new(RefCell::new(VecDeque::new())), canaries: Rc::new(RefCell::new(Vec::new())), - channel_id_bound: None, + channel_id_upper: 0, staged: Vec::new(), sends, recvs, @@ -107,7 +107,7 @@ pub struct ProcessAllocator { canaries: Rc>>, - channel_id_bound: Option, + channel_id_upper: usize, // sending, receiving, and responding to binary buffers. staged: Vec, @@ -122,10 +122,8 @@ impl Allocate for ProcessAllocator { fn allocate(&mut self, identifier: usize) -> (Vec>>>, Box>>) { // Assume and enforce in-order identifier allocation. - if let Some(bound) = self.channel_id_bound { - assert!(bound < identifier); - } - self.channel_id_bound = Some(identifier); + assert!(self.channel_id_upper <= identifier); + self.channel_id_upper = identifier + 1; let mut pushes = Vec::>>>::with_capacity(self.peers()); @@ -202,7 +200,7 @@ impl Allocate for ProcessAllocator { match self.to_local.entry(header.channel) { Entry::Vacant(entry) => { // We may receive data before allocating, and shouldn't block. - if self.channel_id_bound.map(|b| b < header.channel).unwrap_or(true) { + if self.channel_id_upper <= header.channel { entry.insert(Rc::new(RefCell::new(VecDeque::new()))) .borrow_mut() .push_back(peel); @@ -250,4 +248,4 @@ impl Allocate for ProcessAllocator { } } } -} \ No newline at end of file +}