Skip to content

Do not load the entire artifact in memory when uploading (#618) #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tar = "0.4.40"
tempfile = "3.10.0"
text-stub-library = "0.9.0"
tokio = "1.43.1"
tokio-util = "0.7.13"
url = "2.5.0"
version-compare = "0.1.1"
zip = "0.6.6"
Expand Down
154 changes: 101 additions & 53 deletions src/github.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ use {
},
rayon::prelude::*,
reqwest::{Client, StatusCode},
reqwest_middleware::{self, ClientWithMiddleware},
reqwest_retry::{
default_on_request_failure, policies::ExponentialBackoff, RetryTransientMiddleware,
Retryable, RetryableStrategy,
default_on_request_failure, policies::ExponentialBackoff, RetryPolicy, Retryable,
RetryableStrategy,
},
sha2::{Digest, Sha256},
std::{
collections::{BTreeMap, BTreeSet, HashMap},
io::Read,
path::PathBuf,
str::FromStr,
time::{Duration, SystemTime},
},
url::Url,
zip::ZipArchive,
Expand Down Expand Up @@ -65,12 +65,19 @@ async fn fetch_artifact(
Ok(res)
}

enum UploadSource {
Filename(PathBuf),
Data(Bytes),
}

async fn upload_release_artifact(
client: &ClientWithMiddleware,
client: &Client,
retry_policy: &impl RetryPolicy,
retryable_strategy: &impl RetryableStrategy,
auth_token: String,
release: &Release,
filename: String,
data: Bytes,
body: UploadSource,
dry_run: bool,
) -> Result<()> {
if release.assets.iter().any(|asset| asset.name == filename) {
Expand All @@ -93,17 +100,51 @@ async fn upload_release_artifact(
return Ok(());
}

// Octocrab doesn't yet support release artifact upload. And the low-level HTTP API
// forces the use of strings on us. So we have to make our own HTTP client.

let response = client
.put(url)
.header("Authorization", format!("Bearer {auth_token}"))
.header("Content-Length", data.len())
.header("Content-Type", "application/x-tar")
.body(data)
.send()
.await?;
// Octocrab's high-level API for uploading release artifacts doesn't yet support streaming
// bodies, and their low-level API isn't more helpful than using our own HTTP client.
//
// Because we are streaming the body, we can't use the standard retry middleware for reqwest
// (see e.g. https://github.com/seanmonstar/reqwest/issues/2416), so we have to recreate the
// request on each retry and handle the retry logic ourself. This logic is inspired by
// uv/crates/uv-publish/src/lib.rs (which has the same problem), which in turn is inspired by
// reqwest-middleware/reqwest-retry/src/middleware.rs.
//
// (While Octocrab's API would work fine for the non-streaming case, we just use this function
// for both cases so that we can make a homogeneous Vec<impl Future> later in the file.)

let mut n_past_retries = 0;
let start_time = SystemTime::now();
let response = loop {
let request = client
.put(url.clone())
.header("Authorization", format!("Bearer {auth_token}"))
.header("Content-Type", "application/octet-stream");
let request = match body {
UploadSource::Filename(ref path) => {
let file = tokio::fs::File::open(&path).await?;
let len = file.metadata().await?.len();
request.header("Content-Length", len).body(file)
}
UploadSource::Data(ref bytes) => request
.header("Content-Length", bytes.len())
.body(bytes.clone()),
};
let result = request.send().await.map_err(|e| e.into());

if retryable_strategy.handle(&result) == Some(Retryable::Transient) {
let retry_decision = retry_policy.should_retry(start_time, n_past_retries);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw we have a more extensive retry policy in uv, but the reqwest-retry may be sufficient here.

if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
println!("retrying {url}: {result:?}");
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());
tokio::time::sleep(duration).await;
n_past_retries += 1;
continue;
}
}
break result?;
};

if !response.status().is_success() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we don't retry on status errors, such as a 500 (only the 403 we handle in the github upload retry strategy)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This (and the above) is intended to match the existing logic using reqwest-retry... we ought to retry on a 500, though, if we don't let me fix that while we're here.

return Err(anyhow!("HTTP {}", response.status()));
Expand All @@ -112,6 +153,20 @@ async fn upload_release_artifact(
Ok(())
}

fn new_github_client(args: &ArgMatches) -> Result<(Octocrab, String)> {
let token = args
.get_one::<String>("token")
.expect("token should be specified")
.to_string();
let github_uri = args.get_one::<String>("github-uri");

let mut builder = OctocrabBuilder::new().personal_token(token.clone());
if let Some(github_uri) = github_uri {
builder = builder.base_uri(github_uri.clone())?;
}
Ok((builder.build()?, token))
}

pub async fn command_fetch_release_distributions(args: &ArgMatches) -> Result<()> {
let dest_dir = args
.get_one::<PathBuf>("dest")
Expand All @@ -121,13 +176,7 @@ pub async fn command_fetch_release_distributions(args: &ArgMatches) -> Result<()
.expect("organization should be set");
let repo = args.get_one::<String>("repo").expect("repo should be set");

let client = OctocrabBuilder::new()
.personal_token(
args.get_one::<String>("token")
.expect("token should be required argument")
.to_string(),
)
.build()?;
let (client, _) = new_github_client(args)?;

let release_version_range = pep440_rs::VersionSpecifier::from_str(">=3.9")?;

Expand Down Expand Up @@ -207,10 +256,8 @@ pub async fn command_fetch_release_distributions(args: &ArgMatches) -> Result<()
.await?;

for artifact in artifacts {
if matches!(
artifact.name.as_str(),
"pythonbuild" | "toolchain"
) || artifact.name.contains("install-only")
if matches!(artifact.name.as_str(), "pythonbuild" | "toolchain")
|| artifact.name.contains("install-only")
{
continue;
}
Expand Down Expand Up @@ -358,10 +405,6 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
.get_one::<String>("tag")
.expect("tag should be specified");
let ignore_missing = args.get_flag("ignore_missing");
let token = args
.get_one::<String>("token")
.expect("token should be specified")
.to_string();
let organization = args
.get_one::<String>("organization")
.expect("organization should be specified");
Expand Down Expand Up @@ -451,9 +494,7 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
return Err(anyhow!("missing {} release artifacts", missing.len()));
}

let client = OctocrabBuilder::new()
.personal_token(token.clone())
.build()?;
let (client, token) = new_github_client(args)?;
let repo_handler = client.repos(organization, repo);
let releases = repo_handler.releases();

Expand All @@ -473,12 +514,7 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
let mut digests = BTreeMap::new();

let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
let raw_client = reqwest_middleware::ClientBuilder::new(Client::new())
.with(RetryTransientMiddleware::new_with_policy_and_strategy(
retry_policy,
GitHubUploadRetryStrategy,
))
.build();
let raw_client = Client::new();

{
let mut fs = vec![];
Expand All @@ -488,29 +524,39 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(
continue;
}

let file_data = Bytes::copy_from_slice(&std::fs::read(dist_dir.join(&source))?);

let mut digest = Sha256::new();
digest.update(&file_data);

let digest = hex::encode(digest.finalize());

digests.insert(dest.clone(), digest.clone());

let local_filename = dist_dir.join(&source);
fs.push(upload_release_artifact(
&raw_client,
&retry_policy,
&GitHubUploadRetryStrategy,
token.clone(),
&release,
dest.clone(),
file_data,
UploadSource::Filename(local_filename.clone()),
dry_run,
));

// reqwest wants to take ownership of the body, so it's hard for us to do anything
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's totally fine to read the file twice.

// clever with reading the file once and calculating the sha256sum while we read.
// So we open and read the file again.
let digest = {
let file = tokio::fs::File::open(local_filename).await?;
let mut stream = tokio_util::io::ReaderStream::with_capacity(file, 1048576);
let mut hasher = Sha256::new();
while let Some(chunk) = stream.next().await {
hasher.update(&chunk?);
}
hex::encode(hasher.finalize())
};
digests.insert(dest.clone(), digest.clone());
fs.push(upload_release_artifact(
&raw_client,
&retry_policy,
&GitHubUploadRetryStrategy,
token.clone(),
&release,
format!("{}.sha256", dest),
Bytes::copy_from_slice(format!("{}\n", digest).as_bytes()),
format!("{dest}.sha256"),
UploadSource::Data(Bytes::copy_from_slice(format!("{digest}\n").as_bytes())),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
UploadSource::Data(Bytes::copy_from_slice(format!("{digest}\n").as_bytes())),
UploadSource::Data(Bytes::from(format!("{digest}\n"))),

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was like that when I found it :) thanks

dry_run,
));
}
Expand All @@ -532,10 +578,12 @@ pub async fn command_upload_release_distributions(args: &ArgMatches) -> Result<(

upload_release_artifact(
&raw_client,
&retry_policy,
&GitHubUploadRetryStrategy,
token.clone(),
&release,
"SHA256SUMS".to_string(),
Bytes::copy_from_slice(shasums.as_bytes()),
UploadSource::Data(Bytes::copy_from_slice(shasums.as_bytes())),
dry_run,
)
.await?;
Expand Down
12 changes: 12 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ fn main_impl() -> Result<()> {
.action(ArgAction::Set)
.default_value("python-build-standalone")
.help("GitHub repository name"),
)
.arg(
Arg::new("github-uri")
.long("github-uri")
.action(ArgAction::Set)
.help("Alternative GitHub URI"),
),
);

Expand Down Expand Up @@ -154,6 +160,12 @@ fn main_impl() -> Result<()> {
.action(ArgAction::Set)
.default_value("python-build-standalone")
.help("GitHub repository name"),
)
.arg(
Arg::new("github-uri")
.long("github-uri")
.action(ArgAction::Set)
.help("Alternative GitHub URI"),
),
);

Expand Down