Skip to content

Commit ced9b95

Browse files
committed
Make use of the size
1 parent 704c7eb commit ced9b95

File tree

5 files changed

+42
-12
lines changed

5 files changed

+42
-12
lines changed

examples/multiprovider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ impl BlobDownloadProgress {
120120
BitfieldEvent::State { ranges, .. } => {
121121
self.current = ranges;
122122
}
123-
BitfieldEvent::Update { added, removed } => {
123+
BitfieldEvent::Update { added, removed, .. } => {
124124
self.current |= added;
125125
self.current -= removed;
126126
}

src/downloader2.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ pub enum BitfieldEvent {
9595
State {
9696
/// The entire bitfield
9797
ranges: ChunkRanges,
98-
/// The most precise known size of the blob
98+
/// The most precise known size of the blob.
99+
///
100+
/// If I know nothing about the blob, this is u64::MAX.
99101
size: u64,
100102
},
101103
/// An update to the bitfield
@@ -104,6 +106,8 @@ pub enum BitfieldEvent {
104106
added: ChunkRanges,
105107
/// The ranges that were removed
106108
removed: ChunkRanges,
109+
/// A refinement of the size of the blob.
110+
size: u64,
107111
},
108112
}
109113

@@ -346,23 +350,25 @@ impl<S> SimpleBitfieldSubscription<S> {
346350
}
347351
}
348352

349-
async fn get_valid_ranges_local<S: Store>(hash: &Hash, store: S) -> anyhow::Result<ChunkRanges> {
353+
async fn get_valid_ranges_local<S: Store>(hash: &Hash, store: S) -> anyhow::Result<BitfieldEvent> {
350354
if let Some(entry) = store.get_mut(hash).await? {
351-
crate::get::db::valid_ranges::<S>(&entry).await
355+
let (ranges, size) = crate::get::db::valid_ranges_and_size::<S>(&entry).await?;
356+
Ok(BitfieldEvent::State { ranges, size })
352357
} else {
353-
Ok(ChunkRanges::empty())
358+
Ok(BitfieldEvent::State { ranges: ChunkRanges::empty(), size: u64::MAX })
354359
}
355360
}
356361

357362
async fn get_valid_ranges_remote(
358363
endpoint: &Endpoint,
359364
id: NodeId,
360365
hash: &Hash,
361-
) -> anyhow::Result<ChunkRanges> {
366+
) -> anyhow::Result<BitfieldEvent> {
362367
let conn = endpoint.connect(id, crate::ALPN).await?;
363368
let (size, _) = crate::get::request::get_verified_size(&conn, hash).await?;
364369
let chunks = (size + 1023) / 1024;
365-
Ok(ChunkRanges::from(ChunkNum(0)..ChunkNum(chunks)))
370+
let ranges = ChunkRanges::from(ChunkNum(0)..ChunkNum(chunks));
371+
Ok(BitfieldEvent::State { ranges, size })
366372
}
367373

368374
impl<S: Store> BitfieldSubscription for SimpleBitfieldSubscription<S> {
@@ -398,11 +404,14 @@ impl<S: Store> BitfieldSubscription for SimpleBitfieldSubscription<S> {
398404
}
399405
Box::pin(
400406
async move {
401-
let ranges = match recv.await {
407+
let event = match recv.await {
402408
Ok(ev) => ev,
403-
Err(_) => ChunkRanges::empty(),
409+
Err(_) => BitfieldEvent::State {
410+
ranges: ChunkRanges::empty(),
411+
size: u64::MAX,
412+
},
404413
};
405-
BitfieldEvent::State { ranges, size: u64::MAX }
414+
event
406415
}
407416
.into_stream(),
408417
)

src/downloader2/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ async fn peer_download<S: Store>(
288288
event: BitfieldEvent::Update {
289289
added,
290290
removed: ChunkRanges::empty(),
291+
size,
291292
},
292293
})
293294
.await

src/downloader2/state.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ impl DownloaderState {
331331
}
332332
}
333333
state.ranges = ranges;
334+
state.size = state.size.min(size);
334335
self.check_completion(hash, None, evs)?;
335336
} else {
336337
// We got an entirely new peer, mark all affected downloads for rebalancing
@@ -340,21 +341,23 @@ impl DownloaderState {
340341
}
341342
}
342343
state.ranges = ranges;
344+
state.size = state.size.min(size);
343345
}
344346
// we have to call start_downloads even if the local bitfield set, since we don't know in which order local and remote bitfields arrive
345347
self.start_downloads(hash, None, evs)?;
346348
}
347349
Command::BitfieldInfo {
348350
peer,
349351
hash,
350-
event: BitfieldEvent::Update { added, removed },
352+
event: BitfieldEvent::Update { added, removed, size },
351353
} => {
352354
let state = self.bitfields.get_mut(&(peer, hash)).context(format!(
353355
"bitfield update for unknown peer {peer:?} and hash {hash}"
354356
))?;
355357
if peer == BitfieldPeer::Local {
356358
// we got a local bitfield update, notify local observers
357359
// for updates we can just notify the observers that have a non-empty intersection with the update
360+
state.size = state.size.min(size);
358361
if let Some(observers) = self.observers.get_by_hash(&hash) {
359362
for (id, request) in observers {
360363
let added = &added & &request.ranges;
@@ -365,6 +368,7 @@ impl DownloaderState {
365368
event: BitfieldEvent::Update {
366369
added: &added & &request.ranges,
367370
removed: &removed & &request.ranges,
371+
size: state.size,
368372
}
369373
});
370374
}
@@ -383,6 +387,7 @@ impl DownloaderState {
383387
}
384388
state.ranges |= added;
385389
state.ranges &= !removed;
390+
state.size = state.size.min(size);
386391
// a local bitfield update does not make more data available, so we don't need to start downloads
387392
self.start_downloads(hash, None, evs)?;
388393
}
@@ -874,6 +879,8 @@ struct PeerBlobState {
874879
subscription_count: usize,
875880
/// chunk ranges this peer reports to have
876881
ranges: ChunkRanges,
882+
/// The minimum reported size of the blob
883+
size: u64,
877884
}
878885

879886
impl PeerBlobState {
@@ -882,6 +889,7 @@ impl PeerBlobState {
882889
subscription_id,
883890
subscription_count: 1,
884891
ranges: ChunkRanges::empty(),
892+
size: u64::MAX,
885893
}
886894
}
887895
}
@@ -977,6 +985,7 @@ mod tests {
977985
event: BitfieldEvent::Update {
978986
added: chunk_ranges([16..32]),
979987
removed: ChunkRanges::empty(),
988+
size: u64::MAX,
980989
},
981990
});
982991
assert!(evs.is_empty());
@@ -1036,6 +1045,7 @@ mod tests {
10361045
event: BitfieldEvent::Update {
10371046
added: chunk_ranges([32..48]),
10381047
removed: ChunkRanges::empty(),
1048+
size: u64::MAX,
10391049
},
10401050
});
10411051
assert!(evs.is_empty());
@@ -1054,6 +1064,7 @@ mod tests {
10541064
event: BitfieldEvent::Update {
10551065
added: chunk_ranges([48..64]),
10561066
removed: ChunkRanges::empty(),
1067+
size: u64::MAX,
10571068
},
10581069
});
10591070
assert!(
@@ -1129,6 +1140,7 @@ mod tests {
11291140
event: BitfieldEvent::Update {
11301141
added: chunk_ranges([0..16]),
11311142
removed: ChunkRanges::empty(),
1143+
size: u64::MAX,
11321144
},
11331145
});
11341146
// The peer now has more data
@@ -1154,6 +1166,7 @@ mod tests {
11541166
event: BitfieldEvent::Update {
11551167
added: chunk_ranges([16..32]),
11561168
removed: ChunkRanges::empty(),
1169+
size: u64::MAX,
11571170
},
11581171
});
11591172
// This triggers cancellation of the first peer download and starting a new one for the remaining data
@@ -1187,6 +1200,7 @@ mod tests {
11871200
event: BitfieldEvent::Update {
11881201
added: chunk_ranges([32..64]),
11891202
removed: ChunkRanges::empty(),
1203+
size: u64::MAX,
11901204
},
11911205
});
11921206
assert!(

src/get/db.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,12 @@ async fn get_blob<D: BaoStore>(
237237

238238
/// Given a partial entry, get the valid ranges.
239239
pub async fn valid_ranges<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<ChunkRanges> {
240+
let (ranges, _) = valid_ranges_and_size::<D>(entry).await?;
241+
Ok(ranges)
242+
}
243+
244+
/// Given a partial entry, get the valid ranges.
245+
pub async fn valid_ranges_and_size<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<(ChunkRanges, u64)> {
240246
use tracing::trace as log;
241247
// compute the valid range from just looking at the data file
242248
let mut data_reader = entry.data_reader().await?;
@@ -253,7 +259,7 @@ pub async fn valid_ranges<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<Chun
253259
let valid: ChunkRanges = valid_from_data.intersection(&valid_from_outboard);
254260
log!("valid_from_data: {:?}", valid_from_data);
255261
log!("valid_from_outboard: {:?}", valid_from_data);
256-
Ok(valid)
262+
Ok((valid, data_size))
257263
}
258264

259265
/// Get a blob that was requested completely.

0 commit comments

Comments
 (0)