Skip to content

Commit 16192b4

Browse files
fix(l1): handle missing payloads in engine_getPayloadBodiesByRange (#5408)
**Motivation** When we added RPC metrics for success/error rates in #5335, we found that we were returning "Internal error" in the `engine_getPayloadBodiesByRange` method. This was related to changes done in #2430, which turned missing payloads into a DB error. The spec says we should return `null`s for any missing payloads, so this deviates from the spec. **Description** This PR fixes the spec deviation by changing the batch-GET methods from the Store API to return `None` on missing values instead of failing. Closes #5403 --------- Co-authored-by: Rodrigo Oliveri <[email protected]>
1 parent d4f04f1 commit 16192b4

File tree

6 files changed

+55
-39
lines changed

6 files changed

+55
-39
lines changed

crates/networking/p2p/sync.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,12 @@ impl Syncer {
383383
let final_batch = end_block_number == start + batch_size as u64;
384384
// Retrieve batch from DB
385385
if !single_batch {
386-
headers = store.read_fullsync_batch(start, batch_size as u64).await?;
386+
headers = store
387+
.read_fullsync_batch(start, batch_size as u64)
388+
.await?
389+
.into_iter()
390+
.map(|opt| opt.ok_or(SyncError::MissingFullsyncBatch))
391+
.collect::<Result<Vec<_>, SyncError>>()?;
387392
}
388393
let mut blocks = Vec::new();
389394
// Request block bodies
@@ -1162,6 +1167,8 @@ pub enum SyncError {
11621167
BytecodeFileError,
11631168
#[error("Error in Peer Table: {0}")]
11641169
PeerTableError(#[from] PeerTableError),
1170+
#[error("Missing fullsync batch")]
1171+
MissingFullsyncBatch,
11651172
}
11661173

11671174
impl SyncError {
@@ -1185,7 +1192,8 @@ impl SyncError {
11851192
| SyncError::RocksDBError(_)
11861193
| SyncError::BytecodeFileError
11871194
| SyncError::NoLatestCanonical
1188-
| SyncError::PeerTableError(_) => false,
1195+
| SyncError::PeerTableError(_)
1196+
| SyncError::MissingFullsyncBatch => false,
11891197
SyncError::Chain(_)
11901198
| SyncError::Store(_)
11911199
| SyncError::Send(_)

crates/networking/rpc/engine/payload.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,9 +470,11 @@ impl RpcHandler for GetPayloadBodiesByRangeV1Request {
470470
return Err(RpcErr::TooLargeRequest);
471471
}
472472
let latest_block_number = context.storage.get_latest_block_number().await?;
473+
// NOTE: we truncate the range because the spec says we "MUST NOT return trailing
474+
// null values if the request extends past the current latest known block"
473475
let last = latest_block_number.min(self.start + self.count - 1);
474476
let bodies = context.storage.get_block_bodies(self.start, last).await?;
475-
build_payload_body_response(bodies.into_iter().map(Some).collect())
477+
build_payload_body_response(bodies)
476478
}
477479
}
478480

crates/storage/api.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
5858
&self,
5959
from: BlockNumber,
6060
to: BlockNumber,
61-
) -> Result<Vec<BlockBody>, StoreError>;
61+
) -> Result<Vec<Option<BlockBody>>, StoreError>;
6262

6363
/// Obtain block bodies from a list of hashes
6464
async fn get_block_bodies_by_hash(
6565
&self,
6666
hashes: Vec<BlockHash>,
67-
) -> Result<Vec<BlockBody>, StoreError>;
67+
) -> Result<Vec<Option<BlockBody>>, StoreError>;
6868

6969
/// Obtain any block body using the hash
7070
async fn get_block_body_by_hash(
@@ -372,7 +372,7 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
372372
&self,
373373
start: BlockNumber,
374374
limit: u64,
375-
) -> Result<Vec<BlockHeader>, StoreError>;
375+
) -> Result<Vec<Option<BlockHeader>>, StoreError>;
376376

377377
/// Clear all headers downloaded during fullsync
378378
async fn clear_fullsync_headers(&self) -> Result<(), StoreError>;

crates/storage/store.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,14 +239,14 @@ impl Store {
239239
&self,
240240
from: BlockNumber,
241241
to: BlockNumber,
242-
) -> Result<Vec<BlockBody>, StoreError> {
242+
) -> Result<Vec<Option<BlockBody>>, StoreError> {
243243
self.engine.get_block_bodies(from, to).await
244244
}
245245

246246
pub async fn get_block_bodies_by_hash(
247247
&self,
248248
hashes: Vec<BlockHash>,
249-
) -> Result<Vec<BlockBody>, StoreError> {
249+
) -> Result<Vec<Option<BlockBody>>, StoreError> {
250250
self.engine.get_block_bodies_by_hash(hashes).await
251251
}
252252

@@ -1349,7 +1349,7 @@ impl Store {
13491349
&self,
13501350
start: BlockNumber,
13511351
limit: u64,
1352-
) -> Result<Vec<BlockHeader>, StoreError> {
1352+
) -> Result<Vec<Option<BlockHeader>>, StoreError> {
13531353
self.engine.read_fullsync_batch(start, limit).await
13541354
}
13551355

crates/storage/store_db/in_memory.rs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -212,31 +212,27 @@ impl StoreEngine for Store {
212212
&self,
213213
from: BlockNumber,
214214
to: BlockNumber,
215-
) -> Result<Vec<BlockBody>, StoreError> {
215+
) -> Result<Vec<Option<BlockBody>>, StoreError> {
216216
let store = self.inner()?;
217217
let mut res = Vec::new();
218218
for block_number in from..=to {
219-
if let Some(block) = store
219+
let body_opt = store
220220
.canonical_hashes
221221
.get(&block_number)
222-
.and_then(|hash| store.bodies.get(hash))
223-
{
224-
res.push(block.clone())
225-
}
222+
.and_then(|hash| store.bodies.get(hash));
223+
res.push(body_opt.cloned());
226224
}
227225
Ok(res)
228226
}
229227

230228
async fn get_block_bodies_by_hash(
231229
&self,
232230
hashes: Vec<BlockHash>,
233-
) -> Result<Vec<BlockBody>, StoreError> {
231+
) -> Result<Vec<Option<BlockBody>>, StoreError> {
234232
let store = self.inner()?;
235233
let mut res = Vec::new();
236234
for hash in hashes {
237-
if let Some(block) = store.bodies.get(&hash).cloned() {
238-
res.push(block);
239-
}
235+
res.push(store.bodies.get(&hash).cloned());
240236
}
241237
Ok(res)
242238
}
@@ -708,19 +704,12 @@ impl StoreEngine for Store {
708704
&self,
709705
start: BlockNumber,
710706
limit: u64,
711-
) -> Result<Vec<BlockHeader>, StoreError> {
707+
) -> Result<Vec<Option<BlockHeader>>, StoreError> {
712708
let store = self.inner()?;
713-
(start..start + limit)
714-
.map(|ref n| {
715-
store
716-
.fullsync_headers
717-
.get(n)
718-
.cloned()
719-
.ok_or(StoreError::Custom(format!(
720-
"Missing fullsync header for block {n}"
721-
)))
722-
})
723-
.collect::<Result<Vec<_>, _>>()
709+
let batch = (start..start + limit)
710+
.map(|ref n| store.fullsync_headers.get(n).cloned())
711+
.collect();
712+
Ok(batch)
724713
}
725714

726715
async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {

crates/storage/store_db/rocksdb.rs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,7 @@ impl Store {
603603
cf_name: &str,
604604
keys: Vec<K>,
605605
deserialize_fn: F,
606-
) -> Result<Vec<V>, StoreError>
606+
) -> Result<Vec<Option<V>>, StoreError>
607607
where
608608
K: AsRef<[u8]> + Send + 'static,
609609
V: Send + 'static,
@@ -623,10 +623,10 @@ impl Store {
623623
match db.get_cf(&cf, key)? {
624624
Some(bytes) => {
625625
let value = deserialize_fn(bytes)?;
626-
results.push(value);
626+
results.push(Some(value));
627627
}
628628
None => {
629-
return Err(StoreError::Custom("Key not found in bulk read".to_string()));
629+
results.push(None);
630630
}
631631
}
632632
}
@@ -1154,7 +1154,7 @@ impl StoreEngine for Store {
11541154
&self,
11551155
from: BlockNumber,
11561156
to: BlockNumber,
1157-
) -> Result<Vec<BlockBody>, StoreError> {
1157+
) -> Result<Vec<Option<BlockBody>>, StoreError> {
11581158
let numbers: Vec<BlockNumber> = (from..=to).collect();
11591159
let number_keys: Vec<Vec<u8>> = numbers.iter().map(|n| n.to_le_bytes().to_vec()).collect();
11601160

@@ -1168,24 +1168,41 @@ impl StoreEngine for Store {
11681168

11691169
let hash_keys: Vec<Vec<u8>> = hashes
11701170
.iter()
1171-
.map(|hash| BlockHashRLP::from(*hash).bytes().clone())
1171+
.flat_map(|hash_opt| hash_opt.map(|h| BlockHashRLP::from(h).bytes().clone()))
11721172
.collect();
11731173

1174-
let bodies = self
1174+
let mut bodies = self
11751175
.read_bulk_async(CF_BODIES, hash_keys, |bytes| {
11761176
BlockBodyRLP::from_bytes(bytes)
11771177
.to()
11781178
.map_err(StoreError::from)
11791179
})
11801180
.await?;
11811181

1182+
let mut i = 0;
1183+
1184+
// Fill in with None for missing bodies
1185+
let bodies = hashes
1186+
.into_iter()
1187+
.map(|opt| {
1188+
opt.and_then(|_| {
1189+
let body_ref = bodies
1190+
.get_mut(i)
1191+
.expect("bodies length is equal to number of Somes in hashes");
1192+
let body = std::mem::take(body_ref);
1193+
i += 1;
1194+
body
1195+
})
1196+
})
1197+
.collect();
1198+
11821199
Ok(bodies)
11831200
}
11841201

11851202
async fn get_block_bodies_by_hash(
11861203
&self,
11871204
hashes: Vec<BlockHash>,
1188-
) -> Result<Vec<BlockBody>, StoreError> {
1205+
) -> Result<Vec<Option<BlockBody>>, StoreError> {
11891206
let hash_keys: Vec<Vec<u8>> = hashes
11901207
.iter()
11911208
.map(|hash| BlockHashRLP::from(*hash).bytes().clone())
@@ -2052,7 +2069,7 @@ impl StoreEngine for Store {
20522069
&self,
20532070
start: BlockNumber,
20542071
limit: u64,
2055-
) -> Result<Vec<BlockHeader>, StoreError> {
2072+
) -> Result<Vec<Option<BlockHeader>>, StoreError> {
20562073
self.read_bulk_async(
20572074
CF_FULLSYNC_HEADERS,
20582075
(start..start + limit).map(|n| n.to_le_bytes()).collect(),

0 commit comments

Comments
 (0)