Skip to content

Commit 18d7688

Browse files
committed
First somewhat working size handling approach
1 parent 43b9842 commit 18d7688

File tree

3 files changed

+43
-35
lines changed

3 files changed

+43
-35
lines changed

examples/multiprovider.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,33 +104,41 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
104104
/// Progress for a single download
105105
struct BlobDownloadProgress {
106106
request: DownloadRequest,
107-
current: ChunkRanges,
107+
current: BitfieldState,
108108
}
109109

110110
impl BlobDownloadProgress {
111111
fn new(request: DownloadRequest) -> Self {
112112
Self {
113113
request,
114-
current: ChunkRanges::empty(),
114+
current: BitfieldState::unknown(),
115115
}
116116
}
117117

118118
fn update(&mut self, ev: BitfieldEvent) {
119119
match ev {
120-
BitfieldEvent::State(BitfieldState { ranges, .. }) => {
121-
self.current = ranges;
120+
BitfieldEvent::State(BitfieldState { ranges, size }) => {
121+
self.current.size = self.current.size.min(size);
122+
self.current.ranges = ranges;
123+
self.request.ranges &= ChunkRanges::from(..ChunkNum::chunks(self.current.size));
122124
}
123-
BitfieldEvent::Update(BitfieldUpdate { added, removed, .. }) => {
124-
self.current |= added;
125-
self.current -= removed;
125+
BitfieldEvent::Update(BitfieldUpdate {
126+
added,
127+
removed,
128+
size,
129+
}) => {
130+
self.current.size = self.current.size.min(size);
131+
self.request.ranges &= ChunkRanges::from(..ChunkNum::chunks(self.current.size));
132+
self.current.ranges |= added;
133+
self.current.ranges -= removed;
126134
}
127135
}
128136
}
129137

130138
#[allow(dead_code)]
131139
fn get_stats(&self) -> (u64, u64) {
132-
let total = total_bytes(&self.request.ranges, u64::MAX);
133-
let downloaded = total_bytes(&self.current, u64::MAX);
140+
let total = total_bytes(&self.request.ranges, self.current.size);
141+
let downloaded = total_bytes(&self.current.ranges, self.current.size);
134142
(downloaded, total)
135143
}
136144

@@ -140,7 +148,7 @@ impl BlobDownloadProgress {
140148
}
141149

142150
fn is_done(&self) -> bool {
143-
self.current == self.request.ranges
151+
self.current.ranges == self.request.ranges
144152
}
145153
}
146154

@@ -172,23 +180,24 @@ async fn download_impl<S: Store>(args: DownloadArgs, store: S) -> anyhow::Result
172180
.build();
173181
let request = DownloadRequest {
174182
hash: args.hash,
175-
ranges: ChunkRanges::from(ChunkNum(0)..ChunkNum(25421)),
183+
ranges: ChunkRanges::all(),
176184
};
177185
let downloader2 = downloader.clone();
178186
let mut progress = BlobDownloadProgress::new(request.clone());
179187
tokio::spawn(async move {
180188
let request = ObserveRequest {
181189
hash: args.hash,
182-
ranges: ChunkRanges::from(ChunkNum(0)..ChunkNum(25421)),
190+
ranges: ChunkRanges::all(),
183191
buffer: 1024,
184192
};
185193
let mut observe = downloader2.observe(request).await?;
186194
let term = Term::stdout();
187195
let (_, rows) = term.size();
188196
while let Some(chunk) = observe.recv().await {
189197
progress.update(chunk);
190-
let current = progress.current.boundaries();
198+
let current = progress.current.ranges.boundaries();
191199
let requested = progress.request.ranges.boundaries();
200+
println!("observe print_bitmap {:?} {:?}", current, requested);
192201
let bitmap = print_bitmap(current, requested, rows as usize);
193202
print!("\r{bitmap}");
194203
if progress.is_done() {

src/downloader2.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,16 @@ pub struct BitfieldState {
118118
pub size: u64,
119119
}
120120

121+
impl BitfieldState {
122+
/// State for a completely unknown bitfield
123+
pub fn unknown() -> Self {
124+
Self {
125+
ranges: ChunkRanges::empty(),
126+
size: u64::MAX,
127+
}
128+
}
129+
}
130+
121131
/// A download request
122132
#[derive(Debug, Clone)]
123133
pub struct DownloadRequest {
@@ -368,11 +378,7 @@ async fn get_valid_ranges_local<S: Store>(hash: &Hash, store: S) -> anyhow::Resu
368378
let (ranges, size) = crate::get::db::valid_ranges_and_size::<S>(&entry).await?;
369379
Ok(BitfieldState { ranges, size }.into())
370380
} else {
371-
Ok(BitfieldState {
372-
ranges: ChunkRanges::empty(),
373-
size: u64::MAX,
374-
}
375-
.into())
381+
Ok(BitfieldState::unknown().into())
376382
}
377383
}
378384

@@ -423,11 +429,7 @@ impl<S: Store> BitfieldSubscription for SimpleBitfieldSubscription<S> {
423429
async move {
424430
let event = match recv.await {
425431
Ok(ev) => ev,
426-
Err(_) => BitfieldState {
427-
ranges: ChunkRanges::empty(),
428-
size: u64::MAX,
429-
}
430-
.into(),
432+
Err(_) => BitfieldState::unknown().into(),
431433
};
432434
event
433435
}

src/downloader2/state.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -324,21 +324,21 @@ impl DownloaderState {
324324
if peer == BitfieldPeer::Local {
325325
// we got a new local bitmap, notify local observers
326326
// we must notify all local observers, even if the bitmap is empty
327+
state.size = state.size.min(size);
327328
if let Some(observers) = self.observers.get_by_hash(&hash) {
328329
for (id, request) in observers {
329330
let ranges = &ranges & &request.ranges;
330331
evs.push(Event::LocalBitfieldInfo {
331332
id: *id,
332333
event: BitfieldState {
333334
ranges: ranges.clone(),
334-
size,
335+
size: state.size,
335336
}
336337
.into(),
337338
});
338339
}
339340
}
340341
state.ranges = ranges;
341-
state.size = state.size.min(size);
342342
self.check_completion(hash, None, evs)?;
343343
} else {
344344
// We got an entirely new peer, mark all affected downloads for rebalancing
@@ -532,11 +532,13 @@ impl DownloaderState {
532532
// we don't have the self state yet, so we can't really decide if we need to download anything at all
533533
return Ok(());
534534
};
535+
let mask = ChunkRanges::from(ChunkNum(0)..ChunkNum::chunks(self_state.size));
535536
let mut completed = vec![];
536537
for (id, download) in self.downloads.iter_mut_for_hash(hash) {
537538
if just_id.is_some() && just_id != Some(*id) {
538539
continue;
539540
}
541+
download.request.ranges &= mask.clone();
540542
// check if the entire download is complete. If this is the case, peer downloads will be cleaned up later
541543
if self_state.ranges.is_superset(&download.request.ranges) {
542544
// notify the user that the download is complete
@@ -886,6 +888,9 @@ struct PeerState {
886888
}
887889

888890
/// Information about one blob on one peer
891+
///
892+
/// Note that for remote peers we can't really trust this information.
893+
/// They could lie about the size, and the ranges could be either wrong or outdated.
889894
struct PeerBlobState {
890895
/// The subscription id for the subscription
891896
subscription_id: BitfieldSubscriptionId,
@@ -1117,11 +1122,7 @@ mod tests {
11171122
state.apply(Command::BitfieldInfo {
11181123
peer: Local,
11191124
hash,
1120-
event: BitfieldState {
1121-
ranges: ChunkRanges::empty(),
1122-
size: u64::MAX,
1123-
}
1124-
.into(),
1125+
event: BitfieldState::unknown().into(),
11251126
});
11261127
// We have a peer for the hash
11271128
state.apply(Command::PeerDiscovered { peer: peer_a, hash });
@@ -1357,11 +1358,7 @@ mod tests {
13571358
state.apply(Command::BitfieldInfo {
13581359
peer: Local,
13591360
hash,
1360-
event: BitfieldState {
1361-
ranges: ChunkRanges::empty(),
1362-
size: u64::MAX,
1363-
}
1364-
.into(),
1361+
event: BitfieldState::unknown().into(),
13651362
});
13661363
// We have a peer for the hash
13671364
state.apply(Command::PeerDiscovered { peer: peer_a, hash });

0 commit comments

Comments
 (0)