Skip to content

Commit ad564a2

Browse files
oci: make oci pull multi-threaded async
Add an async version of `Repository::ensure_object()` and wire it through `SplitStreamWriter::write_external()`. Call that when we're splitting OCI layer tarballs to offload the writing of external objects (and the `fdatasync()` that goes with it) to a separate thread. This is something like some prep work for something we've been trying to accomplish for a while in #62 but it doesn't come close to the complete picture (since it still writes the objects sequentially). Modify the (already) async code in oci::ImageOp to download layers in parallel. This is a big deal for images with many layers (as is often the case for bootc images, due to the splitting heuristics). This takes a pull of the Fedora Silverblue 42 container image (when pulled from a local `oci-dir`) from ~90s to ~20s time to complete. Unfortunately, container images made from large single layers are not substantially improved. Signed-off-by: Allison Karlitskaya <[email protected]>
1 parent 6c029ae commit ad564a2

File tree

5 files changed

+35
-10
lines changed

5 files changed

+35
-10
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ rhel9 = ['pre-6.15']
2020
anyhow = { version = "1.0.87", default-features = false }
2121
async-compression = { version = "0.4.0", default-features = false, features = ["tokio", "zstd", "gzip"] }
2222
clap = { version = "4.0.1", default-features = false, features = ["std", "help", "usage", "derive"] }
23-
containers-image-proxy = "0.7.0"
23+
containers-image-proxy = { git = "https://github.com/containers/containers-image-proxy-rs", rev = "0bb03cea8468c24bae47bfcfe16c44973cc2d3a6" }
2424
env_logger = "0.11.0"
25+
futures = "0.3.31"
2526
hex = "0.4.0"
2627
indicatif = { version = "0.17.0", features = ["tokio"] }
2728
log = "0.4.8"

src/oci/mod.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{collections::HashMap, io::Read, iter::zip, path::Path, sync::Arc};
88
use anyhow::{bail, ensure, Context, Result};
99
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
1010
use containers_image_proxy::{ImageProxy, ImageProxyConfig, OpenedImage};
11+
use futures::future::join_all;
1112
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
1213
use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
1314
use sha2::{Digest, Sha256};
@@ -96,14 +97,14 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
9697

9798
pub async fn ensure_layer(
9899
&self,
99-
layer_sha256: &Sha256Digest,
100+
layer_sha256: Sha256Digest,
100101
descriptor: &Descriptor,
101102
) -> Result<ObjectID> {
102103
// We need to use the per_manifest descriptor to download the compressed layer but it gets
103104
// stored in the repository via the per_config descriptor. Our return value is the
104105
// fsverity digest for the corresponding splitstream.
105106

106-
if let Some(layer_id) = self.repo.check_stream(layer_sha256)? {
107+
if let Some(layer_id) = self.repo.check_stream(&layer_sha256)? {
107108
self.progress
108109
.println(format!("Already have layer {layer_sha256:?}"))?;
109110
Ok(layer_id)
@@ -122,7 +123,7 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
122123
self.progress
123124
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
124125

125-
let mut splitstream = self.repo.create_stream(Some(*layer_sha256), None);
126+
let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
126127
match descriptor.media_type() {
127128
MediaType::ImageLayer => {
128129
split_async(progress, &mut splitstream).await?;
@@ -173,13 +174,22 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
173174
let config = ImageConfiguration::from_reader(&raw_config[..])?;
174175

175176
let mut config_maps = DigestMap::new();
177+
let mut tasks = vec![];
178+
let mut shas = vec![];
176179
for (mld, cld) in zip(manifest_layers, config.rootfs().diff_ids()) {
177180
let layer_sha256 = sha256_from_digest(cld)?;
178-
let layer_id = self
179-
.ensure_layer(&layer_sha256, mld)
180-
.await
181-
.with_context(|| format!("Failed to fetch layer {cld} via {mld:?}"))?;
182-
config_maps.insert(&layer_sha256, &layer_id);
181+
let mld = mld.clone();
182+
let self_ = Arc::clone(self);
183+
let future =
184+
tokio::spawn(async move { self_.ensure_layer(layer_sha256, &mld).await });
185+
tasks.push(future);
186+
shas.push(layer_sha256);
187+
}
188+
189+
let layer_ids = join_all(tasks).await;
190+
191+
for (layer_sha256, layer_id) in zip(shas, layer_ids) {
192+
config_maps.insert(&layer_sha256, &layer_id??);
183193
}
184194

185195
let mut splitstream = self

src/oci/tar.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub async fn split_async(
9494
if header.entry_type() == EntryType::Regular && actual_size > INLINE_CONTENT_MAX {
9595
// non-empty regular file: store the data in the object store
9696
let padding = buffer.split_off(actual_size);
97-
writer.write_external(&buffer, padding)?;
97+
writer.write_external_async(buffer, padding).await?;
9898
} else {
9999
// else: store the data inline in the split stream
100100
writer.write_inline(&buffer);

src/repository.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
106106
})
107107
}
108108

109+
pub async fn ensure_object_async(self: &Arc<Self>, data: Vec<u8>) -> Result<ObjectID> {
110+
let self_ = Arc::clone(self);
111+
tokio::task::spawn_blocking(move || self_.ensure_object(&data)).await?
112+
}
113+
109114
pub fn ensure_object(&self, data: &[u8]) -> Result<ObjectID> {
110115
Self::ensure_object_in_dir(self.objects_dir()?, data)
111116
}

src/splitstream.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,15 @@ impl<ObjectID: FsVerityHashValue> SplitStreamWriter<ObjectID> {
155155
self.write_reference(&id, padding)
156156
}
157157

158+
pub async fn write_external_async(&mut self, data: Vec<u8>, padding: Vec<u8>) -> Result<()> {
159+
if let Some((ref mut sha256, ..)) = self.sha256 {
160+
sha256.update(&data);
161+
sha256.update(&padding);
162+
}
163+
let id = self.repo.ensure_object_async(data).await?;
164+
self.write_reference(&id, padding)
165+
}
166+
158167
pub fn done(mut self) -> Result<ObjectID> {
159168
self.flush_inline(vec![])?;
160169

0 commit comments

Comments
 (0)