Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/broker_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ impl StorageApi for BrokerStorage {
.lock()
.map_err(|_| BrokerRpcError::MutexError("storage".to_string()))?;
let mut keys = storage_lock
.partial_compare_keys(&format!("broker_msg_{dest}_"))
.partial_compare_keys(&format!("broker_msg_{dest}_"), None)
.unwrap_or(vec![]);
if keys.is_empty() {
return Ok(None);
}
keys.sort();
let key = keys.first().unwrap();
if let Some(msg) = storage_lock.get(key).unwrap_or(None) {
if let Some(msg) = storage_lock.get(key, None).unwrap_or(None) {
let parts: Vec<&str> = key.split('_').collect();
let uid = parts[3]
.parse::<u64>()
Expand All @@ -61,11 +61,11 @@ impl StorageApi for BrokerStorage {
.map_err(|_| BrokerRpcError::MutexError("storage".to_string()))?;
let mut messages = Vec::new();
let mut keys = storage_lock
.partial_compare_keys(&format!("broker_msg_{dest}_"))
.partial_compare_keys(&format!("broker_msg_{dest}_"), None)
.unwrap_or(vec![]);
keys.sort();
for key in keys {
if let Some(msg) = storage_lock.get(&key).unwrap_or(None) {
if let Some(msg) = storage_lock.get(&key, None).unwrap_or(None) {
let parts: Vec<&str> = key.split('_').collect();
let uid = parts[3]
.parse::<u64>()
Expand All @@ -85,7 +85,7 @@ impl StorageApi for BrokerStorage {
.lock()
.map_err(|_| BrokerRpcError::MutexError("storage".to_string()))?;
let keys = storage_lock
.partial_compare_keys(&format!("broker_msg_{dest}_{}", format_uid(uid)))
.partial_compare_keys(&format!("broker_msg_{dest}_{}", format_uid(uid)), None)
.unwrap_or(vec![]);
if keys.len() != 1 {
return Ok(false);
Expand All @@ -109,7 +109,7 @@ impl StorageApi for BrokerStorage {
.map_err(|_| BrokerRpcError::MutexError("storage".to_string()))?;

let uid: u64 = storage_lock
.get("broker_current_uid")
.get("broker_current_uid", None)
.unwrap_or(None)
.unwrap_or(0)
+ 1;
Expand Down
10 changes: 5 additions & 5 deletions src/channel/queue_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl QueueChannel {

fn get_next_idx(&self, queue: &QueueType) -> Result<u64, BrokerError> {
let key = self.storage_idx_key(queue);
let current_idx: u64 = self.storage.get(&key).unwrap_or(None).unwrap_or(0) + 1;
let current_idx: u64 = self.storage.get(&key, None).unwrap_or(None).unwrap_or(0) + 1;
self.storage.set(&key, current_idx, None)?;
Ok(current_idx)
}
Expand Down Expand Up @@ -264,7 +264,7 @@ impl QueueChannel {
fn process_out_queue(&self) -> Result<(), BrokerError> {
let mut storage_keys = self
.storage
.partial_compare_keys(&self.partial_compare_keys(&QueueType::OutQueue))?
.partial_compare_keys(&self.partial_compare_keys(&QueueType::OutQueue), None)?
.into_iter()
.collect::<Vec<String>>();
storage_keys.sort_by_key(|key| {
Expand All @@ -280,7 +280,7 @@ impl QueueChannel {
let now = now_ms()?;

for key in storage_keys {
if let Some(raw) = self.storage.get::<_, String>(&key)? {
if let Some(raw) = self.storage.get::<_, String>(&key, None)? {
let parts: Vec<&str> = key.split('/').collect();
if parts.len() < 7 {
continue;
Expand Down Expand Up @@ -404,7 +404,7 @@ impl QueueChannel {
) -> Result<Vec<(ReceiveHandlerChannel, Option<String>)>, BrokerError> {
let mut storage_keys = self
.storage
.partial_compare_keys(&self.partial_compare_keys(queue_type))?
.partial_compare_keys(&self.partial_compare_keys(queue_type), None)?
.into_iter()
.collect::<Vec<String>>();
storage_keys.sort_by_key(|key| {
Expand All @@ -417,7 +417,7 @@ impl QueueChannel {
let mut messages = vec![];

for key in storage_keys {
if let Some(data) = self.storage.get(&key)? {
if let Some(data) = self.storage.get(&key, None)? {
let x: String = data;
let parts: Vec<&str> = key.split('/').collect();
if parts.len() < 7 {
Expand Down
Loading