Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ dns-server = { path = "dns-server" }
dns-server-api = { path = "dns-server-api" }
dns-service-client = { path = "clients/dns-service-client" }
dpd-client = { git = "https://github.com/oxidecomputer/dendrite" }
dropshot = { version = "0.16.1", features = [ "usdt-probes" ] }
dropshot = { version = "0.16.2", features = [ "usdt-probes" ] }
dyn-clone = "1.0.19"
either = "1.14.0"
ereport-types = { path = "ereport/types" }
Expand Down
96 changes: 86 additions & 10 deletions dev-tools/omdb/src/bin/omdb/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ use serde::Deserialize;
use slog_error_chain::InlineErrorChain;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fs::OpenOptions;
use std::str::FromStr;
use std::sync::Arc;
use support_bundle_viewer::LocalFileAccess;
Expand Down Expand Up @@ -522,6 +523,13 @@ struct SupportBundleDownloadArgs {
/// instead of stdout.
#[arg(short, long)]
output: Option<Utf8PathBuf>,

/// If "true" and using an output path, resumes downloading.
///
/// This assumes the contents of "output" are already valid, and resumes
/// downloading at the end of the file.
#[arg(short, long, default_value_t = false)]
resume: bool,
}

#[derive(Debug, Args)]
Expand Down Expand Up @@ -3858,10 +3866,10 @@ async fn cmd_nexus_support_bundles_delete(
}

async fn write_stream_to_sink(
mut stream: impl futures::Stream<Item = reqwest::Result<bytes::Bytes>>
+ std::marker::Unpin,
mut stream: impl futures::Stream<Item = anyhow::Result<bytes::Bytes>>,
mut sink: impl std::io::Write,
) -> Result<(), anyhow::Error> {
let mut stream = std::pin::pin!(stream);
while let Some(data) = stream.next().await {
match data {
Err(err) => return Err(anyhow::anyhow!(err)),
Expand All @@ -3871,19 +3879,85 @@ async fn write_stream_to_sink(
Ok(())
}

// Downloads a portion of a support bundle using range requests.
//
// "range" is in bytes, and is inclusive on both sides.
async fn support_bundle_download_range(
client: &nexus_client::Client,
id: SupportBundleUuid,
range: (u64, u64),
) -> anyhow::Result<impl futures::Stream<Item = anyhow::Result<bytes::Bytes>>> {
let range = format!("bytes={}-{}", range.0, range.1);
Ok(client
.support_bundle_download(id.as_untyped_uuid(), Some(&range))
.await
.with_context(|| format!("downloading support bundle {}", id))?
.into_inner_stream()
.map(|r| r.map_err(|err| anyhow::anyhow!(err))))
}

// Downloads all ranges of a support bundle, and combines them into a single
// stream.
//
// Starts the download at "start" bytes (inclusive) and continues up to "end"
// bytes (exclusive).
fn support_bundle_download_ranges(
client: &nexus_client::Client,
id: SupportBundleUuid,
start: u64,
end: u64,
) -> impl futures::Stream<Item = anyhow::Result<bytes::Bytes>> + use<'_> {
// Arbitrary chunk size of 100 MiB.
//
// Note that we'll still stream data in packets which are smaller than this,
// but we won't keep a single connection to Nexus open for longer than a 100
// MiB download.
const CHUNK_SIZE: u64 = 100 * (1 << 20);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're happy with this, I'll add something similar to the CLI once this PR merges

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the thought here that at some threshold we're testing our luck with regard to a timeout duration for a single query?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, exactly, 100 MiB seems small enough to not hit our 60 sec timeout. I could change this arbitrarily, but there's a cost with setting up each stream, so I didn't want to make it too small either.

(but this is semi-arbitrary; I can tweak the value if we feel strongly)

futures::stream::try_unfold(
(start, start + CHUNK_SIZE - 1),
move |range| async move {
if end <= range.0 {
return Ok(None);
}

let stream =
support_bundle_download_range(client, id, range).await?;
let next_range = (range.0 + CHUNK_SIZE, range.1 + CHUNK_SIZE);
Ok::<_, anyhow::Error>(Some((stream, next_range)))
},
)
.try_flatten()
}

/// Runs `omdb nexus support-bundles download`
async fn cmd_nexus_support_bundles_download(
client: &nexus_client::Client,
args: &SupportBundleDownloadArgs,
) -> Result<(), anyhow::Error> {
let stream = client
.support_bundle_download(args.id.as_untyped_uuid())
.await
.with_context(|| format!("downloading support bundle {}", args.id))?
.into_inner_stream();
let total_length = client
.support_bundle_head(args.id.as_untyped_uuid())
.await?
.content_length()
.ok_or_else(|| anyhow::anyhow!("No content length"))?;

let start = match &args.output {
Some(output) if output.exists() && args.resume => {
output.metadata()?.len()
}
_ => 0,
};

let stream =
support_bundle_download_ranges(client, args.id, start, total_length);

let sink: Box<dyn std::io::Write> = match &args.output {
Some(path) => Box::new(std::fs::File::create(path)?),
Some(path) => Box::new(
OpenOptions::new()
.create(true)
.append(true)
.truncate(!args.resume)
.open(path)?,
),
None => Box::new(std::io::stdout()),
};

Expand All @@ -3904,7 +3978,8 @@ async fn cmd_nexus_support_bundles_get_index(
.with_context(|| {
format!("downloading support bundle index {}", args.id)
})?
.into_inner_stream();
.into_inner_stream()
.map(|r| r.map_err(|err| anyhow::anyhow!(err)));

write_stream_to_sink(stream, std::io::stdout()).await.with_context(
|| format!("streaming support bundle index {}", args.id),
Expand All @@ -3929,7 +4004,8 @@ async fn cmd_nexus_support_bundles_get_file(
args.id, args.path
)
})?
.into_inner_stream();
.into_inner_stream()
.map(|r| r.map_err(|err| anyhow::anyhow!(err)));

let sink: Box<dyn std::io::Write> = match &args.output {
Some(path) => Box::new(std::fs::File::create(path)?),
Expand Down
13 changes: 7 additions & 6 deletions nexus/external-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ use std::collections::BTreeMap;
use anyhow::anyhow;
use dropshot::Body;
use dropshot::{
EmptyScanParams, EndpointTagPolicy, HttpError, HttpResponseAccepted,
HttpResponseCreated, HttpResponseDeleted, HttpResponseFound,
HttpResponseHeaders, HttpResponseOk, HttpResponseSeeOther,
HttpResponseUpdatedNoContent, PaginationParams, Path, Query,
RequestContext, ResultsPage, StreamingBody, TypedBody,
EmptyScanParams, EndpointTagPolicy, Header, HttpError,
HttpResponseAccepted, HttpResponseCreated, HttpResponseDeleted,
HttpResponseFound, HttpResponseHeaders, HttpResponseOk,
HttpResponseSeeOther, HttpResponseUpdatedNoContent, PaginationParams, Path,
Query, RequestContext, ResultsPage, StreamingBody, TypedBody,
WebsocketChannelResult, WebsocketConnection,
};
use http::Response;
use ipnetwork::IpNetwork;
use nexus_types::{
authn::cookies::Cookies,
external_api::{params, shared, views},
external_api::{headers, params, shared, views},
};
use omicron_common::api::external::{
http_pagination::{
Expand Down Expand Up @@ -3168,6 +3168,7 @@ pub trait NexusExternalApi {
}]
async fn support_bundle_download(
rqctx: RequestContext<Self::Context>,
headers: Header<headers::RangeRequest>,
path_params: Path<params::SupportBundlePath>,
) -> Result<Response<Body>, HttpError>;

Expand Down
8 changes: 5 additions & 3 deletions nexus/internal-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
use std::collections::{BTreeMap, BTreeSet};

use dropshot::{
Body, HttpError, HttpResponseCreated, HttpResponseDeleted, HttpResponseOk,
HttpResponseUpdatedNoContent, Path, Query, RequestContext, ResultsPage,
TypedBody,
Body, Header, HttpError, HttpResponseCreated, HttpResponseDeleted,
HttpResponseOk, HttpResponseUpdatedNoContent, Path, Query, RequestContext,
ResultsPage, TypedBody,
};
use http::Response;
use nexus_types::{
Expand All @@ -16,6 +16,7 @@ use nexus_types::{
ClickhousePolicy, OximeterReadPolicy,
},
external_api::{
headers::RangeRequest,
params::{self, PhysicalDiskPath, SledSelector, UninitializedSledId},
shared::{self, ProbeInfo, UninitializedSled},
views::Ping,
Expand Down Expand Up @@ -579,6 +580,7 @@ pub trait NexusInternalApi {
}]
async fn support_bundle_download(
rqctx: RequestContext<Self::Context>,
headers: Header<RangeRequest>,
path_params: Path<params::SupportBundlePath>,
) -> Result<Response<Body>, HttpError>;

Expand Down
14 changes: 12 additions & 2 deletions nexus/src/app/support_bundles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl super::Nexus {
id: SupportBundleUuid,
query: SupportBundleQueryType,
head: bool,
_range: Option<PotentialRange>,
range: Option<PotentialRange>,
) -> Result<Response<Body>, Error> {
// Lookup the bundle, confirm it's accessible
let (.., bundle) = LookupPath::new(opctx, &self.db_datastore)
Expand Down Expand Up @@ -105,7 +105,16 @@ impl super::Nexus {
.expect("Failed to build reqwest Client");
let client = self.sled_client_ext(&sled_id, client).await?;

// TODO: Use "range"?
let range = if let Some(potential_range) = &range {
Some(potential_range.try_into_str().map_err(|err| match err {
range_requests::Error::Parse(_) => Error::invalid_request(
"Failed to parse range request header",
),
_ => Error::internal_error("Invalid range request"),
})?)
} else {
None
};

let response = match (query, head) {
(SupportBundleQueryType::Whole, true) => {
Expand All @@ -123,6 +132,7 @@ impl super::Nexus {
&ZpoolUuid::from(bundle.zpool_id),
&DatasetUuid::from(bundle.dataset_id),
&SupportBundleUuid::from(bundle.id),
range,
)
.await
}
Expand Down
9 changes: 8 additions & 1 deletion nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::context::ApiContext;
use crate::external_api::shared;
use dropshot::Body;
use dropshot::EmptyScanParams;
use dropshot::Header;
use dropshot::HttpError;
use dropshot::HttpResponseDeleted;
use dropshot::HttpResponseOk;
Expand Down Expand Up @@ -49,6 +50,7 @@ use nexus_external_api::*;
use nexus_types::{
authn::cookies::Cookies,
external_api::{
headers::RangeRequest,
params::SystemMetricsPathParam,
shared::{BfdStatus, ProbeInfo},
},
Expand Down Expand Up @@ -108,6 +110,7 @@ use propolis_client::support::tungstenite::protocol::frame::coding::CloseCode;
use propolis_client::support::tungstenite::protocol::{
CloseFrame, Role as WebSocketRole,
};
use range_requests::PotentialRange;
use range_requests::RequestContextEx;
use ref_cast::RefCast;

Expand Down Expand Up @@ -7107,6 +7110,7 @@ impl NexusExternalApi for NexusExternalApiImpl {

async fn support_bundle_download(
rqctx: RequestContext<Self::Context>,
headers: Header<RangeRequest>,
path_params: Path<params::SupportBundlePath>,
) -> Result<Response<Body>, HttpError> {
let apictx = rqctx.context();
Expand All @@ -7117,7 +7121,10 @@ impl NexusExternalApi for NexusExternalApiImpl {
crate::context::op_context_for_external_api(&rqctx).await?;

let head = false;
let range = rqctx.range();
let range = headers
.into_inner()
.range
.map(|r| PotentialRange::new(r.as_bytes()));

let body = nexus
.support_bundle_download(
Expand Down
9 changes: 8 additions & 1 deletion nexus/src/internal_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::context::ApiContext;
use crate::external_api::shared;
use dropshot::ApiDescription;
use dropshot::Body;
use dropshot::Header;
use dropshot::HttpError;
use dropshot::HttpResponseCreated;
use dropshot::HttpResponseDeleted;
Expand All @@ -28,6 +29,7 @@ use nexus_types::deployment::BlueprintTarget;
use nexus_types::deployment::BlueprintTargetSet;
use nexus_types::deployment::ClickhousePolicy;
use nexus_types::deployment::OximeterReadPolicy;
use nexus_types::external_api::headers::RangeRequest;
use nexus_types::external_api::params::PhysicalDiskPath;
use nexus_types::external_api::params::SledSelector;
use nexus_types::external_api::params::SupportBundleFilePath;
Expand Down Expand Up @@ -63,6 +65,7 @@ use omicron_common::api::internal::nexus::SledVmmState;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::InstanceUuid;
use omicron_uuid_kinds::SupportBundleUuid;
use range_requests::PotentialRange;
use range_requests::RequestContextEx;
use std::collections::BTreeMap;

Expand Down Expand Up @@ -1026,6 +1029,7 @@ impl NexusInternalApi for NexusInternalApiImpl {

async fn support_bundle_download(
rqctx: RequestContext<Self::Context>,
headers: Header<RangeRequest>,
path_params: Path<SupportBundlePath>,
) -> Result<Response<Body>, HttpError> {
let apictx = rqctx.context();
Expand All @@ -1036,7 +1040,10 @@ impl NexusInternalApi for NexusInternalApiImpl {
crate::context::op_context_for_internal_api(&rqctx).await;

let head = false;
let range = rqctx.range();
let range = headers
.into_inner()
.range
.map(|r| PotentialRange::new(r.as_bytes()));

let body = nexus
.support_bundle_download(
Expand Down
1 change: 1 addition & 0 deletions nexus/test-utils/src/http_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl<'a> RequestBuilder<'a> {
pub fn expect_range_requestable(mut self) -> Self {
self.allowed_headers.as_mut().unwrap().extend([
http::header::CONTENT_LENGTH,
http::header::CONTENT_RANGE,
http::header::CONTENT_TYPE,
http::header::ACCEPT_RANGES,
]);
Expand Down
Loading
Loading