Skip to content

Commit 9175662

Browse files
committed
feat(DA): request shares HTTP API
1 parent 2265b3b commit 9175662

File tree

16 files changed

+353
-19
lines changed

16 files changed

+353
-19
lines changed

clients/common-http-client/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ name = "common-http-client"
55
version = "0.1.0"
66

77
[dependencies]
8+
futures = "0.3.31"
89
nomos-core = { workspace = true }
910
nomos-da-messages = { workspace = true }
1011
reqwest = "0.12"

clients/common-http-client/src/lib.rs

+46-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use std::{fmt::Debug, sync::Arc};
22

3+
use futures::{Stream, StreamExt};
34
use nomos_core::da::blob::Blob;
4-
use nomos_da_messages::http::da::{DABlobCommitmentsRequest, DAGetLightBlobReq};
5+
use nomos_da_messages::http::da::{DABlobCommitmentsRequest, DAGetLightBlobReq, GetSharesRequest};
56
use reqwest::{Client, ClientBuilder, RequestBuilder, StatusCode, Url};
67
use serde::{de::DeserializeOwned, Serialize};
78

89
// These could be moved into shared location, perhaps to upcoming `nomos-lib`
910
const DA_GET_SHARED_COMMITMENTS: &str = "/da/get-commitments";
1011

1112
const DA_GET_LIGHT_BLOB: &str = "/da/get-blob";
13+
const DA_GET_SHARES: &str = "/da/sampling/shares";
1214

1315
#[derive(thiserror::Error, Debug)]
1416
pub enum Error {
@@ -131,4 +133,47 @@ impl CommonHttpClient {
131133
let request_url = base_url.join(path).map_err(Error::Url)?;
132134
self.get(request_url, &request).await
133135
}
136+
137+
pub async fn get_shares<B>(
138+
&self,
139+
base_url: Url,
140+
blob_id: B::BlobId,
141+
requested_shares: Vec<B::ColumnIndex>,
142+
filter_shares: Vec<B::ColumnIndex>,
143+
return_available: bool,
144+
) -> Result<impl Stream<Item = B::LightBlob>, Error>
145+
where
146+
B: Blob,
147+
<B as Blob>::BlobId: serde::Serialize + Send + Sync,
148+
<B as Blob>::ColumnIndex: serde::Serialize + DeserializeOwned + Send + Sync,
149+
<B as Blob>::LightBlob: DeserializeOwned + Send + Sync,
150+
{
151+
let request: GetSharesRequest<B> = GetSharesRequest {
152+
blob_id,
153+
requested_shares,
154+
filter_shares,
155+
return_available,
156+
};
157+
let request_url = base_url
158+
.join(DA_GET_SHARES.trim_start_matches('/'))
159+
.map_err(Error::Url)?;
160+
let mut request = self.client.get(request_url).json(&request);
161+
162+
if let Some(basic_auth) = &self.basic_auth {
163+
request = request.basic_auth(&basic_auth.username, basic_auth.password.as_deref());
164+
}
165+
166+
let response = request.send().await.map_err(Error::Request)?;
167+
let status = response.status();
168+
169+
let shares_stream = response.bytes_stream().filter_map(|item| async move {
170+
item.ok()
171+
.and_then(|bytes| serde_json::from_slice::<B::LightBlob>(&bytes).ok())
172+
});
173+
match status {
174+
StatusCode::OK => Ok(shares_stream),
175+
StatusCode::INTERNAL_SERVER_ERROR => Err(Error::Server("Error".to_string())),
176+
_ => Err(Error::Server(format!("Unexpected response [{status}]",))),
177+
}
178+
}
134179
}

clients/executor-http-client/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ version = "0.1.0"
66

77
[dependencies]
88
common-http-client = { workspace = true }
9+
futures = "0.3.31"
910
nomos-core = { workspace = true }
1011
nomos-executor = { workspace = true }
1112
reqwest = "0.12"

clients/executor-http-client/src/lib.rs

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub use common_http_client::BasicAuthCredentials;
22
use common_http_client::{CommonHttpClient, Error};
3+
use futures::Stream;
34
use nomos_core::da::blob::Blob;
45
use nomos_executor::api::{handlers::DispersalRequest, paths};
56
use reqwest::Url;
@@ -68,4 +69,29 @@ impl ExecutorHttpClient {
6869
.get_blob::<B, C>(base_url, blob_id, column_idx)
6970
.await
7071
}
72+
73+
pub async fn get_shares<B>(
74+
&self,
75+
base_url: Url,
76+
blob_id: B::BlobId,
77+
requested_shares: Vec<B::ColumnIndex>,
78+
filter_shares: Vec<B::ColumnIndex>,
79+
return_available: bool,
80+
) -> Result<impl Stream<Item = B::LightBlob>, Error>
81+
where
82+
B: Blob,
83+
<B as Blob>::BlobId: serde::Serialize + Send + Sync,
84+
<B as Blob>::ColumnIndex: serde::Serialize + DeserializeOwned + Send + Sync,
85+
<B as Blob>::LightBlob: DeserializeOwned + Send + Sync,
86+
{
87+
self.client
88+
.get_shares::<B>(
89+
base_url,
90+
blob_id,
91+
requested_shares,
92+
filter_shares,
93+
return_available,
94+
)
95+
.await
96+
}
7197
}

nodes/nomos-executor/src/api/backend.rs

+15-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use hyper::header::{CONTENT_TYPE, USER_AGENT};
55
use nomos_api::Backend;
66
use nomos_core::{
77
da::{
8-
blob::{info::DispersedBlobInfo, metadata, Blob},
8+
blob::{info::DispersedBlobInfo, metadata, Blob, LightBlob},
99
DaVerifier as CoreDaVerifier,
1010
},
1111
header::HeaderId,
@@ -19,7 +19,7 @@ use nomos_libp2p::PeerId;
1919
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
2020
use nomos_node::api::handlers::{
2121
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
22-
cryptarchia_info, da_get_commitments, da_get_light_blob, get_range, libp2p_info,
22+
cryptarchia_info, da_get_commitments, da_get_light_blob, da_get_shares, get_range, libp2p_info,
2323
};
2424
use nomos_storage::backends::StorageSerde;
2525
use overwatch::overwatch::handle::OverwatchHandle;
@@ -160,8 +160,15 @@ where
160160
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
161161
<DaBlob as Blob>::BlobId:
162162
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
163-
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + DeserializeOwned + Send + Sync + 'static,
164-
<DaBlob as Blob>::LightBlob: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
163+
<DaBlob as Blob>::ColumnIndex:
164+
AsRef<[u8]> + Serialize + DeserializeOwned + Hash + Eq + Send + Sync + 'static,
165+
<DaBlob as Blob>::LightBlob: LightBlob<ColumnIndex = <DaBlob as Blob>::ColumnIndex>
166+
+ Serialize
167+
+ DeserializeOwned
168+
+ Clone
169+
+ Send
170+
+ Sync
171+
+ 'static,
165172
<DaBlob as Blob>::SharedCommitments:
166173
Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
167174
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
@@ -398,6 +405,10 @@ where
398405
paths::DA_GET_LIGHT_BLOB,
399406
routing::get(da_get_light_blob::<DaStorageSerializer, DaBlob>),
400407
)
408+
.route(
409+
paths::DA_GET_SHARES,
410+
routing::get(da_get_shares::<DaStorageSerializer, DaBlob>),
411+
)
401412
.with_state(handle);
402413

403414
Server::bind(&self.settings.address)

nodes/nomos-node/src/api/backend.rs

+15-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use hyper::header::{CONTENT_TYPE, USER_AGENT};
55
use nomos_api::Backend;
66
use nomos_core::{
77
da::{
8-
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob},
8+
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob, LightBlob},
99
DaVerifier as CoreDaVerifier,
1010
},
1111
header::HeaderId,
@@ -30,7 +30,7 @@ use utoipa_swagger_ui::SwaggerUi;
3030

3131
use super::handlers::{
3232
add_blob, add_blob_info, add_tx, block, cl_metrics, cl_status, cryptarchia_headers,
33-
cryptarchia_info, da_get_commitments, da_get_light_blob, get_range, libp2p_info,
33+
cryptarchia_info, da_get_commitments, da_get_light_blob, da_get_shares, get_range, libp2p_info,
3434
};
3535
use crate::api::paths;
3636

@@ -138,8 +138,15 @@ where
138138
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
139139
<DaBlob as Blob>::BlobId:
140140
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
141-
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + DeserializeOwned + Send + Sync + 'static,
142-
DaBlob::LightBlob: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
141+
<DaBlob as Blob>::ColumnIndex:
142+
AsRef<[u8]> + Serialize + DeserializeOwned + Hash + Eq + Send + Sync + 'static,
143+
DaBlob::LightBlob: LightBlob<ColumnIndex = <DaBlob as Blob>::ColumnIndex>
144+
+ Serialize
145+
+ DeserializeOwned
146+
+ Clone
147+
+ Send
148+
+ Sync
149+
+ 'static,
143150
DaBlob::SharedCommitments: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
144151
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
145152
+ Clone
@@ -363,6 +370,10 @@ where
363370
paths::DA_GET_LIGHT_BLOB,
364371
routing::get(da_get_light_blob::<DaStorageSerializer, DaBlob>),
365372
)
373+
.route(
374+
paths::DA_GET_SHARES,
375+
routing::get(da_get_shares::<DaStorageSerializer, DaBlob>),
376+
)
366377
.with_state(handle);
367378

368379
Server::bind(&self.settings.address)

nodes/nomos-node/src/api/handlers.rs

+54-5
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
use std::{error::Error, fmt::Debug, hash::Hash};
22

33
use axum::{
4+
body::StreamBody,
45
extract::{Query, State},
5-
response::Response,
6+
response::{IntoResponse, Response},
67
Json,
78
};
9+
use http::{header, StatusCode};
810
use nomos_api::http::{cl, consensus, da, libp2p, mempool, storage};
911
use nomos_core::{
1012
da::{
11-
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob},
13+
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob, LightBlob},
1214
BlobId, DaVerifier as CoreDaVerifier,
1315
},
1416
header::HeaderId,
1517
tx::Transaction,
1618
};
17-
use nomos_da_messages::http::da::{DABlobCommitmentsRequest, DAGetLightBlobReq, GetRangeReq};
19+
use nomos_da_messages::http::da::{
20+
DABlobCommitmentsRequest, DAGetLightBlobReq, GetRangeReq, GetSharesRequest,
21+
};
1822
use nomos_da_network_core::SubnetworkId;
1923
use nomos_da_sampling::backend::DaSamplingServiceBackend;
2024
use nomos_da_verifier::backend::VerifierBackend;
@@ -254,7 +258,8 @@ where
254258
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
255259
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
256260
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
257-
<B as Blob>::LightBlob: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
261+
<B as Blob>::LightBlob:
262+
LightBlob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
258263
<B as Blob>::SharedCommitments: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
259264
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
260265
+ Clone
@@ -445,7 +450,7 @@ where
445450
DaBlob: Blob,
446451
<DaBlob as Blob>::BlobId: AsRef<[u8]> + DeserializeOwned + Clone + Send + Sync + 'static,
447452
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + DeserializeOwned + Send + Sync + 'static,
448-
<DaBlob as Blob>::LightBlob: Serialize + DeserializeOwned + Send + Sync + 'static,
453+
<DaBlob as Blob>::LightBlob: LightBlob + Serialize + DeserializeOwned + Send + Sync + 'static,
449454
StorageOp: StorageSerde + Send + Sync + 'static,
450455
<StorageOp as StorageSerde>::Error: Send + Sync,
451456
{
@@ -456,6 +461,50 @@ where
456461
))
457462
}
458463

464+
#[utoipa::path(
465+
get,
466+
path = paths::DA_GET_SHARES,
467+
responses(
468+
(status = 200, description = "Request shares for a blob", body = GetSharesRequest<DaBlob>),
469+
(status = 500, description = "Internal server error", body = StreamBody),
470+
)
471+
)]
472+
pub async fn da_get_shares<StorageOp, DaBlob>(
473+
State(handle): State<OverwatchHandle>,
474+
Json(request): Json<GetSharesRequest<DaBlob>>,
475+
) -> Response
476+
where
477+
DaBlob: Blob,
478+
<DaBlob as Blob>::BlobId: AsRef<[u8]> + DeserializeOwned + Clone + Send + Sync + 'static,
479+
<DaBlob as Blob>::ColumnIndex: serde::Serialize + DeserializeOwned + Send + Sync + Eq + Hash,
480+
<DaBlob as Blob>::LightBlob: LightBlob<ColumnIndex = <DaBlob as Blob>::ColumnIndex>
481+
+ Serialize
482+
+ DeserializeOwned
483+
+ Send
484+
+ Sync
485+
+ 'static,
486+
StorageOp: StorageSerde + Send + Sync + 'static,
487+
<StorageOp as StorageSerde>::Error: Send + Sync,
488+
<DaBlob as Blob>::LightBlob: LightBlob,
489+
<DaBlob as Blob>::ColumnIndex: 'static,
490+
{
491+
match storage::get_shares::<StorageOp, DaBlob>(
492+
&handle,
493+
request.blob_id,
494+
request.requested_shares,
495+
request.filter_shares,
496+
request.return_available,
497+
)
498+
.await
499+
{
500+
Ok(shares) => {
501+
let body = StreamBody::new(shares);
502+
IntoResponse::into_response(([(header::CONTENT_TYPE, "application/x-ndjson")], body))
503+
}
504+
Err(e) => IntoResponse::into_response((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
505+
}
506+
}
507+
459508
#[utoipa::path(
460509
post,
461510
path = paths::MEMPOOL_ADD_TX,

nodes/nomos-node/src/api/paths.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub const CRYPTARCHIA_HEADERS: &str = "/cryptarchia/headers";
55
pub const DA_ADD_BLOB: &str = "/da/add-blob";
66
pub const DA_GET_RANGE: &str = "/da/get-range";
77
pub const DA_GET_SHARED_COMMITMENTS: &str = "/da/get-commitments";
8+
pub const DA_GET_SHARES: &str = "/da/sampling/shares";
89
pub const DA_GET_LIGHT_BLOB: &str = "/da/get-blob";
910
pub const NETWORK_INFO: &str = "/network/info";
1011
pub const STORAGE_BLOCK: &str = "/storage/block";

nomos-core/chain-defs/src/da/blob/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ pub mod info;
22
pub mod metadata;
33
pub mod select;
44

5+
pub trait LightBlob {
6+
type ColumnIndex;
7+
fn column_idx(&self) -> Self::ColumnIndex;
8+
}
9+
510
pub trait Blob {
611
type BlobId;
712
type ColumnIndex;

nomos-da/kzgrs-backend/src/common/blob.rs

+8
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ pub struct DaLightBlob {
126126
pub rows_proofs: Vec<Proof>,
127127
}
128128

129+
impl blob::LightBlob for DaLightBlob {
130+
type ColumnIndex = [u8; 2];
131+
132+
fn column_idx(&self) -> Self::ColumnIndex {
133+
self.column_idx.to_be_bytes()
134+
}
135+
}
136+
129137
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
130138
pub struct DaBlobSharedCommitments {
131139
#[serde(

nomos-da/network/messages/src/http/da.rs

+11
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,14 @@ pub struct DAGetLightBlobReq<B: Blob> {
2525
pub blob_id: B::BlobId,
2626
pub column_idx: B::ColumnIndex,
2727
}
28+
29+
#[derive(Serialize, Deserialize)]
30+
pub struct GetSharesRequest<B: Blob>
31+
where
32+
<B as Blob>::ColumnIndex: Serialize + DeserializeOwned,
33+
{
34+
pub blob_id: B::BlobId,
35+
pub requested_shares: Vec<B::ColumnIndex>,
36+
pub filter_shares: Vec<B::ColumnIndex>,
37+
pub return_available: bool,
38+
}

nomos-services/api/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ instrumentation = []
1111

1212
[dependencies]
1313
async-trait = "0.1"
14+
bytes = "1.10.1"
1415
cryptarchia-consensus = { workspace = true, features = ["libp2p"] }
16+
futures = "0.3.31"
1517
kzgrs-backend = { workspace = true }
1618
nomos-blend-service = { workspace = true, features = ["libp2p"] }
1719
nomos-core = { workspace = true }
@@ -30,6 +32,7 @@ overwatch = { workspace = true }
3032
overwatch-derive = { workspace = true }
3133
rand = "0.8"
3234
serde = { version = "1", features = ["derive"] }
35+
serde_json = "1.0.140"
3336
subnetworks-assignations = { workspace = true }
3437
tokio = { version = "1", default-features = false, features = ["sync"] }
3538

0 commit comments

Comments
 (0)