diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 7b7e6a80395..ef14e19e663 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -580,202 +580,315 @@ where true } - /// Publishes a message with multiple topics to the network. + /// Publishes a message to a single topic on the network. pub fn publish( &mut self, topic: impl Into, data: impl Into>, ) -> Result { - let data = data.into(); - let topic = topic.into(); - - // Transform the data before building a raw_message. - let transformed_data = self - .data_transform - .outbound_transform(&topic, data.clone())?; - - // check that the size doesn't exceed the max transmission size. - if transformed_data.len() > self.config.max_transmit_size() { - return Err(PublishError::MessageTooLarge); - } - - let raw_message = self.build_raw_message(topic, transformed_data)?; - - // calculate the message id from the un-transformed data - let msg_id = self.config.message_id(&Message { - source: raw_message.source, - data, // the uncompressed form - sequence_number: raw_message.sequence_number, - topic: raw_message.topic.clone(), - }); - - // Check the if the message has been published before - if self.duplicate_cache.contains(&msg_id) { - // This message has already been seen. We don't re-publish messages that have already - // been published on the network. - tracing::warn!( - message=%msg_id, - "Not publishing a message that has already been published" - ); - return Err(PublishError::Duplicate); - } + let mut result = self.batch_publish(topic, vec![data.into()])?; - tracing::trace!(message=%msg_id, "Publishing message"); + Ok(result + .pop() + .expect("This should never be empty, otherwise a fault with batch publishing")) + } - let topic_hash = raw_message.topic.clone(); + /// Publishes a collection of messages with a single topic to the network. + /// + /// NOTE: If a single message errors, the entire batch will fail. + pub fn batch_publish( + &mut self, + topic: impl Into, + data_list: Vec>, + ) -> Result, PublishError> { + let topic_hash = topic.into(); - let mut peers_on_topic = self - .connected_peers - .iter() - .filter(|(_, p)| p.topics.contains(&topic_hash)) - .map(|(peer_id, _)| peer_id) - .peekable(); + // Get a set of peers for this topic that we can publish to + let peers_to_publish_to = self.get_peers_on_topic_for_publishing(&topic_hash); - if peers_on_topic.peek().is_none() { + // If there are not enough peers to send this message, error + if peers_to_publish_to.is_empty() { return Err(PublishError::InsufficientPeers); } - let mut recipient_peers = HashSet::new(); - if self.config.flood_publish() { - // Forward to all peers above score and all explicit peers - recipient_peers.extend(peers_on_topic.filter(|p| { - self.explicit_peers.contains(*p) - || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 - })); - } else { - match self.mesh.get(&topic_hash) { - // Mesh peers - Some(mesh_peers) => { - // We have a mesh set. We want to make sure to publish to at least `mesh_n` - // peers (if possible). - let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len()); - - if needed_extra_peers > 0 { - // We don't have `mesh_n` peers in our mesh, we will randomly select extras - // and publish to them. - - // Get a random set of peers that are appropriate to send messages too. - let peer_list = get_random_peers( - &self.connected_peers, - &topic_hash, - needed_extra_peers, - |peer| { - !mesh_peers.contains(peer) - && !self.explicit_peers.contains(peer) - && !self - .score_below_threshold(peer, |pst| pst.publish_threshold) - .0 - }, - ); - recipient_peers.extend(peer_list); - } + // This is the collection of validated raw messages that we want to send in this batch, with + // their associated message-ids. + let mut validated_raw_messages: Vec<(MessageId, RawMessage)> = + Vec::with_capacity(data_list.len()); - recipient_peers.extend(mesh_peers); - } - // Gossipsub peers - None => { - tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); - // `fanout_peers` is always non-empty if it's `Some`. - let fanout_peers = self - .fanout - .get(&topic_hash) - .filter(|peers| !peers.is_empty()); - // If we have fanout peers add them to the map. - if let Some(peers) = fanout_peers { - for peer in peers { - recipient_peers.insert(*peer); - } - } else { - // We have no fanout peers, select mesh_n of them and add them to the fanout - let mesh_n = self.config.mesh_n(); - let new_peers = - get_random_peers(&self.connected_peers, &topic_hash, mesh_n, { - |p| { - !self.explicit_peers.contains(p) - && !self - .score_below_threshold(p, |pst| pst.publish_threshold) - .0 - } - }); - // Add the new peers to the fanout and recipient peers - self.fanout.insert(topic_hash.clone(), new_peers.clone()); - for peer in new_peers { - tracing::debug!(%peer, "Peer added to fanout"); - recipient_peers.insert(peer); - } - } - // We are publishing to fanout peers - update the time we published - self.fanout_last_pub - .insert(topic_hash.clone(), Instant::now()); - } + // Keeps track of which messages have failed to be published or not to report back to the + // user. + let mut failed_publish = HashMap::with_capacity(data_list.len()); + + // Perform message validation and checks before sending + // This will error all messages, if a single message fails validation. + for data in data_list.into_iter() { + // Transform the data before building a raw_message. + let transformed_data = self + .data_transform + .outbound_transform(&topic_hash, data.clone())?; + + // Check that the size doesn't exceed the max transmission size. + if transformed_data.len() > self.config.max_transmit_size() { + return Err(PublishError::MessageTooLarge); } - // Explicit peers that are part of the topic - recipient_peers - .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id))); + let raw_message = self.build_raw_message(topic_hash.clone(), transformed_data)?; - // Floodsub peers - for (peer, connections) in &self.connected_peers { - if connections.kind == PeerKind::Floodsub - && !self - .score_below_threshold(peer, |ts| ts.publish_threshold) - .0 - { - recipient_peers.insert(*peer); - } + // Calculate the message id from the un-transformed data + let msg_id = self.config.message_id(&Message { + source: raw_message.source, + data, // the uncompressed form + sequence_number: raw_message.sequence_number, + topic: raw_message.topic.clone(), + }); + + // Check the if the message has been published before + if self.duplicate_cache.contains(&msg_id) { + // This message has already been seen. We don't re-publish messages that have already + // been published on the network. + tracing::warn!( + message=%msg_id, + "Not publishing a message that has already been published" + ); + return Err(PublishError::Duplicate); } - } - // If the message isn't a duplicate and we have sent it to some peers add it to the - // duplicate cache and memcache. - self.duplicate_cache.insert(msg_id.clone()); - self.mcache.put(&msg_id, raw_message.clone()); + // If the message isn't a duplicate and we have sent it to some peers add it to the + // duplicate cache and memcache. + self.duplicate_cache.insert(msg_id.clone()); + self.mcache.put(&msg_id, raw_message.clone()); + + // If the message is anonymous or has a random author add it to the published message ids + // cache. + if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config { + if !self.config.allow_self_origin() { + self.published_message_ids.insert(msg_id.clone()); + } + } - // If the message is anonymous or has a random author add it to the published message ids - // cache. - if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config { - if !self.config.allow_self_origin() { - self.published_message_ids.insert(msg_id.clone()); + // Broadcast IDONTWANT messages + if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() + && self.config.idontwant_on_publish() + { + self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref()); } + + // Set all messages as "failed" until proven otherwise. + failed_publish.insert(msg_id.clone(), true); + + validated_raw_messages.push((msg_id, raw_message)); } - // Send to peers we know are subscribed to the topic. - let mut publish_failed = true; - for peer_id in recipient_peers.iter() { - tracing::trace!(peer=%peer_id, "Sending message to peer"); + // Used for error reporting. + let peers_to_publish_to_len = peers_to_publish_to.len(); + + // Messages are validated, we now attempt to send the batch, efficiently to all the peers + // available + let publish_schedule = self.schedule_publish(peers_to_publish_to, validated_raw_messages); + + // We need to keep track to see if there is an error with sending any given message, so that + // we can report any error back to the user to let them know if any individual message + // failed to get published. + + // We send to peers following the schedule + for (peer_id, msg_id, raw_message) in publish_schedule { + tracing::trace!(peer=%peer_id, %msg_id, "Publishing message to peer"); if self.send_message( - *peer_id, + peer_id, RpcOut::Publish { - message: raw_message.clone(), + message: raw_message, timeout: Delay::new(self.config.publish_queue_duration()), }, ) { - publish_failed = false + failed_publish.insert(msg_id, false); } } - if recipient_peers.is_empty() { - return Err(PublishError::InsufficientPeers); + // Account for the failed and successful messages + let mut failed_messages = Vec::new(); + let mut successful_messages = Vec::with_capacity(failed_publish.len()); + for (msg_id, failed) in failed_publish.into_iter() { + if failed { + tracing::warn!(%msg_id, "Failed to publish message due to queue length"); + failed_messages.push(msg_id); + } else { + tracing::debug!(message=%msg_id, "Published message"); + successful_messages.push(msg_id); + + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_published_message(&topic_hash); + } + } } - if publish_failed { - return Err(PublishError::AllQueuesFull(recipient_peers.len())); + // If any message failed, report back to the user + if !failed_messages.is_empty() { + return Err(PublishError::AllQueuesFull { + failed_messages, + peers_to_publish_to: peers_to_publish_to_len, + }); } - // Broadcast IDONTWANT messages - if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() - && self.config.idontwant_on_publish() - { - self.send_idontwant(&raw_message, &msg_id, raw_message.source.as_ref()); + // All messages were published successfully, report the message ids. + Ok(successful_messages) + } + + // This function performs the load balancing across a set of peers for a batch of messages. A + // variety of strategies can be used here. + // + // The current implementation is a simple round-robin approach, given a set of peers and a set + // of messages, we allocate the messages sequentially across the peer set, ensuring every peer + // sees exactly one of every type of message. + // + // NOTE: This is currently unnecessarily tied to `self` as future implementations may want to + // use configuration parameters to optimise the strategy here. + fn schedule_publish( + &self, + peers: HashSet, + messages: Vec<(MessageId, RawMessage)>, + ) -> Vec<(PeerId, MessageId, RawMessage)> { + // This algorithm round-robins each message to the available peer-set by filling each peer's + // queue starting out of sequence by a factor of `number_of_messages/available_peers`. If + // this results in 0, we set to 1. + // + // For example, if we have 50 messages and only 2 peers to send to, we fill each peers queue + // with messages, shifted by 25. i.e peer 1 will have messages: 1,2,3...50 and peer 2 will + // have 25,26,27...50,1,2...24. They are shifted by the `number_of_messages/available_peers` + // factor. + // + // Notice that if we have more peers than messages, then we have a shift of 1, which reduces + // to the simple round robin case. + + let shift_factor = messages.len() / peers.len(); + let shift_factor = if shift_factor == 0 { 1 } else { shift_factor }; + + let mut return_schedule = Vec::with_capacity(peers.len() * messages.len()); + + for (cycle, peer_id) in peers.into_iter().enumerate() { + let iterator_shift = cycle * shift_factor % messages.len(); + + for (msg_id, raw_message) in messages + .iter() + .cycle() + .skip(iterator_shift) + .take(messages.len()) + { + return_schedule.push((peer_id, msg_id.clone(), raw_message.clone())); + } } - tracing::debug!(message=%msg_id, "Published message"); + return_schedule + } - if let Some(metrics) = self.metrics.as_mut() { - metrics.register_published_message(&topic_hash); + /// A helper function that obtains a set of peers that we should publish to, for a given topic. + fn get_peers_on_topic_for_publishing(&mut self, topic_hash: &TopicHash) -> HashSet { + // The set of peers we will return + let mut recipient_peers = HashSet::new(); + + // Collect all the peers that are subscribed to this topic. + let mut peers_on_topic = self + .connected_peers + .iter() + .filter(|(_, p)| p.topics.contains(topic_hash)) + .map(|(peer_id, _)| peer_id) + .peekable(); + + // If there are no peers, return the empty set. + if peers_on_topic.peek().is_none() { + return recipient_peers; } - Ok(msg_id) + // If we are flood publishing we send to all peers with appropriate score. + if self.config.flood_publish() { + // Forward to all peers above score and all explicit peers + recipient_peers.extend(peers_on_topic.filter(|p| { + self.explicit_peers.contains(*p) + || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 + })); + return recipient_peers; + } + + // If we are not flood publishing, collect peers from the mesh and fanout etc. + match self.mesh.get(topic_hash) { + // If we have mesh peers... + Some(mesh_peers) => { + // We have a mesh set. We want to make sure to publish to at least `mesh_n` + // peers (if possible). + let needed_extra_peers = self.config.mesh_n().saturating_sub(mesh_peers.len()); + + if needed_extra_peers > 0 { + // We don't have `mesh_n` peers in our mesh, we will randomly select extras + // and publish to them. + + // Get a random set of peers that are appropriate to send messages too. + let peer_list = get_random_peers( + &self.connected_peers, + topic_hash, + needed_extra_peers, + |peer| { + !mesh_peers.contains(peer) + && !self.explicit_peers.contains(peer) + && !self + .score_below_threshold(peer, |pst| pst.publish_threshold) + .0 + }, + ); + recipient_peers.extend(peer_list); + } + + recipient_peers.extend(mesh_peers); + } + // Gossipsub peers + None => { + tracing::debug!(topic=%topic_hash, "Topic not in the mesh"); + // `fanout_peers` is always non-empty if it's `Some`. + let fanout_peers = self + .fanout + .get(topic_hash) + .filter(|peers| !peers.is_empty()); + // If we have fanout peers add them to the map. + if let Some(peers) = fanout_peers { + for peer in peers { + recipient_peers.insert(*peer); + } + } else { + // We have no fanout peers, select mesh_n of them and add them to the fanout + let mesh_n = self.config.mesh_n(); + let new_peers = get_random_peers(&self.connected_peers, topic_hash, mesh_n, { + |p| { + !self.explicit_peers.contains(p) + && !self.score_below_threshold(p, |pst| pst.publish_threshold).0 + } + }); + // Add the new peers to the fanout and recipient peers + self.fanout.insert(topic_hash.clone(), new_peers.clone()); + for peer in new_peers { + tracing::debug!(%peer, "Peer added to fanout"); + recipient_peers.insert(peer); + } + } + // We are publishing to fanout peers - update the time we published + self.fanout_last_pub + .insert(topic_hash.clone(), Instant::now()); + } + } + + // Explicit peers that are part of the topic + recipient_peers + .extend(peers_on_topic.filter(|peer_id| self.explicit_peers.contains(peer_id))); + + // Floodsub peers + for (peer, connections) in &self.connected_peers { + if connections.kind == PeerKind::Floodsub + && !self + .score_below_threshold(peer, |ts| ts.publish_threshold) + .0 + { + recipient_peers.insert(*peer); + } + } + recipient_peers } /// This function should be called when [`Config::validate_messages()`] is `true` after diff --git a/protocols/gossipsub/src/error.rs b/protocols/gossipsub/src/error.rs index eae4c51214e..a3ebe4f028d 100644 --- a/protocols/gossipsub/src/error.rs +++ b/protocols/gossipsub/src/error.rs @@ -20,6 +20,7 @@ //! Error types that can result from gossipsub. +use crate::types::MessageId; use libp2p_identity::SigningError; /// Error associated with publishing a gossipsub message. @@ -38,7 +39,12 @@ pub enum PublishError { TransformFailed(std::io::Error), /// Messages could not be sent because the queues for all peers were full. The usize represents /// the number of peers that were attempted. - AllQueuesFull(usize), + AllQueuesFull { + /// The messages that failed + failed_messages: Vec, + // The number of peers that we attempted to send to. + peers_to_publish_to: usize, + }, } impl std::fmt::Display for PublishError {