Skip to content

Commit 5017f86

Browse files
committed
feat(DA): request shares HTTP API
1 parent 693d2bc commit 5017f86

File tree

16 files changed

+355
-21
lines changed

16 files changed

+355
-21
lines changed

clients/common-http-client/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ name = "common-http-client"
55
version = "0.1.0"
66

77
[dependencies]
8+
futures = "0.3.31"
89
nomos-core = { workspace = true }
910
nomos-da-messages = { workspace = true }
1011
reqwest = "0.12"

clients/common-http-client/src/lib.rs

+46-1
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use std::{fmt::Debug, sync::Arc};
22

3+
use futures::{Stream, StreamExt};
34
use nomos_core::da::blob::Blob;
4-
use nomos_da_messages::http::da::{DABlobCommitmentsRequest, DAGetLightBlobReq};
5+
use nomos_da_messages::http::da::{DABlobCommitmentsRequest, DAGetLightBlobReq, GetSharesRequest};
56
use reqwest::{Client, ClientBuilder, RequestBuilder, StatusCode, Url};
67
use serde::{de::DeserializeOwned, Serialize};
78

89
// These could be moved into shared location, perhaps to upcoming `nomos-lib`
910
const DA_GET_SHARED_COMMITMENTS: &str = "/da/get-commitments";
1011

1112
const DA_GET_LIGHT_BLOB: &str = "/da/get-blob";
13+
const DA_GET_SHARES: &str = "/da/sampling/shares";
1214

1315
#[derive(thiserror::Error, Debug)]
1416
pub enum Error {
@@ -131,4 +133,47 @@ impl CommonHttpClient {
131133
let request_url = base_url.join(path).map_err(Error::Url)?;
132134
self.get(request_url, &request).await
133135
}
136+
137+
pub async fn get_shares<B>(
138+
&self,
139+
base_url: Url,
140+
blob_id: B::BlobId,
141+
requested_shares: Vec<B::ColumnIndex>,
142+
filter_shares: Vec<B::ColumnIndex>,
143+
return_available: bool,
144+
) -> Result<impl Stream<Item = B::LightBlob>, Error>
145+
where
146+
B: Blob,
147+
<B as Blob>::BlobId: serde::Serialize + Send + Sync,
148+
<B as Blob>::ColumnIndex: serde::Serialize + DeserializeOwned + Send + Sync,
149+
<B as Blob>::LightBlob: DeserializeOwned + Send + Sync,
150+
{
151+
let request: GetSharesRequest<B> = GetSharesRequest {
152+
blob_id,
153+
requested_shares,
154+
filter_shares,
155+
return_available,
156+
};
157+
let request_url = base_url
158+
.join(DA_GET_SHARES.trim_start_matches('/'))
159+
.map_err(Error::Url)?;
160+
let mut request = self.client.get(request_url).json(&request);
161+
162+
if let Some(basic_auth) = &self.basic_auth {
163+
request = request.basic_auth(&basic_auth.username, basic_auth.password.as_deref());
164+
}
165+
166+
let response = request.send().await.map_err(Error::Request)?;
167+
let status = response.status();
168+
169+
let shares_stream = response.bytes_stream().filter_map(|item| async move {
170+
item.ok()
171+
.and_then(|bytes| serde_json::from_slice::<B::LightBlob>(&bytes).ok())
172+
});
173+
match status {
174+
StatusCode::OK => Ok(shares_stream),
175+
StatusCode::INTERNAL_SERVER_ERROR => Err(Error::Server("Error".to_string())),
176+
_ => Err(Error::Server(format!("Unexpected response [{status}]",))),
177+
}
178+
}
134179
}

clients/executor-http-client/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ version = "0.1.0"
66

77
[dependencies]
88
common-http-client = { workspace = true }
9+
futures = "0.3.31"
910
nomos-core = { workspace = true }
1011
nomos-executor = { workspace = true }
1112
reqwest = "0.12"

clients/executor-http-client/src/lib.rs

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub use common_http_client::BasicAuthCredentials;
22
use common_http_client::{CommonHttpClient, Error};
3+
use futures::Stream;
34
use nomos_core::da::blob::Blob;
45
use nomos_executor::api::{handlers::DispersalRequest, paths};
56
use reqwest::Url;
@@ -68,4 +69,29 @@ impl ExecutorHttpClient {
6869
.get_blob::<B, C>(base_url, blob_id, column_idx)
6970
.await
7071
}
72+
73+
pub async fn get_shares<B>(
74+
&self,
75+
base_url: Url,
76+
blob_id: B::BlobId,
77+
requested_shares: Vec<B::ColumnIndex>,
78+
filter_shares: Vec<B::ColumnIndex>,
79+
return_available: bool,
80+
) -> Result<impl Stream<Item = B::LightBlob>, Error>
81+
where
82+
B: Blob,
83+
<B as Blob>::BlobId: serde::Serialize + Send + Sync,
84+
<B as Blob>::ColumnIndex: serde::Serialize + DeserializeOwned + Send + Sync,
85+
<B as Blob>::LightBlob: DeserializeOwned + Send + Sync,
86+
{
87+
self.client
88+
.get_shares::<B>(
89+
base_url,
90+
blob_id,
91+
requested_shares,
92+
filter_shares,
93+
return_available,
94+
)
95+
.await
96+
}
7197
}

nodes/nomos-executor/src/api/backend.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use hyper::header::{CONTENT_TYPE, USER_AGENT};
55
use nomos_api::Backend;
66
use nomos_core::{
77
da::{
8-
blob::{info::DispersedBlobInfo, metadata, Blob},
8+
blob::{info::DispersedBlobInfo, metadata, Blob, LightBlob},
99
DaVerifier as CoreDaVerifier,
1010
},
1111
header::HeaderId,
@@ -20,8 +20,8 @@ use nomos_libp2p::PeerId;
2020
use nomos_mempool::{tx::service::openapi::Status, MempoolMetrics};
2121
use nomos_node::api::handlers::{
2222
add_blob, add_blob_info, add_tx, blacklisted_peers, block, block_peer, cl_metrics, cl_status,
23-
cryptarchia_headers, cryptarchia_info, da_get_commitments, da_get_light_blob, get_range,
24-
libp2p_info, unblock_peer,
23+
cryptarchia_headers, cryptarchia_info, da_get_commitments, da_get_light_blob, da_get_shares,
24+
get_range, libp2p_info, unblock_peer,
2525
};
2626
use nomos_storage::backends::StorageSerde;
2727
use overwatch::overwatch::handle::OverwatchHandle;
@@ -162,8 +162,15 @@ where
162162
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
163163
<DaBlob as Blob>::BlobId:
164164
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
165-
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + DeserializeOwned + Send + Sync + 'static,
166-
<DaBlob as Blob>::LightBlob: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
165+
<DaBlob as Blob>::ColumnIndex:
166+
AsRef<[u8]> + Serialize + DeserializeOwned + Hash + Eq + Send + Sync + 'static,
167+
<DaBlob as Blob>::LightBlob: LightBlob<ColumnIndex = <DaBlob as Blob>::ColumnIndex>
168+
+ Serialize
169+
+ DeserializeOwned
170+
+ Clone
171+
+ Send
172+
+ Sync
173+
+ 'static,
167174
<DaBlob as Blob>::SharedCommitments:
168175
Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
169176
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
@@ -412,6 +419,10 @@ where
412419
paths::DA_GET_LIGHT_BLOB,
413420
routing::get(da_get_light_blob::<DaStorageSerializer, DaBlob>),
414421
)
422+
.route(
423+
paths::DA_GET_SHARES,
424+
routing::get(da_get_shares::<DaStorageSerializer, DaBlob>),
425+
)
415426
.with_state(handle);
416427

417428
Server::bind(&self.settings.address)

nodes/nomos-node/src/api/backend.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use hyper::header::{CONTENT_TYPE, USER_AGENT};
55
use nomos_api::Backend;
66
use nomos_core::{
77
da::{
8-
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob},
8+
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob, LightBlob},
99
DaVerifier as CoreDaVerifier,
1010
},
1111
header::HeaderId,
@@ -31,8 +31,8 @@ use utoipa_swagger_ui::SwaggerUi;
3131

3232
use super::handlers::{
3333
add_blob, add_blob_info, add_tx, blacklisted_peers, block, block_peer, cl_metrics, cl_status,
34-
cryptarchia_headers, cryptarchia_info, da_get_commitments, da_get_light_blob, get_range,
35-
libp2p_info, unblock_peer,
34+
cryptarchia_headers, cryptarchia_info, da_get_commitments, da_get_light_blob, da_get_shares,
35+
get_range, libp2p_info, unblock_peer,
3636
};
3737
use crate::api::paths;
3838

@@ -140,8 +140,15 @@ where
140140
DaBlob: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
141141
<DaBlob as Blob>::BlobId:
142142
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync + 'static,
143-
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + DeserializeOwned + Send + Sync + 'static,
144-
DaBlob::LightBlob: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
143+
<DaBlob as Blob>::ColumnIndex:
144+
AsRef<[u8]> + Serialize + DeserializeOwned + Hash + Eq + Send + Sync + 'static,
145+
DaBlob::LightBlob: LightBlob<ColumnIndex = <DaBlob as Blob>::ColumnIndex>
146+
+ Serialize
147+
+ DeserializeOwned
148+
+ Clone
149+
+ Send
150+
+ Sync
151+
+ 'static,
145152
DaBlob::SharedCommitments: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
146153
DaBlobInfo: DispersedBlobInfo<BlobId = [u8; 32]>
147154
+ Clone
@@ -377,6 +384,10 @@ where
377384
paths::DA_GET_LIGHT_BLOB,
378385
routing::get(da_get_light_blob::<DaStorageSerializer, DaBlob>),
379386
)
387+
.route(
388+
paths::DA_GET_SHARES,
389+
routing::get(da_get_shares::<DaStorageSerializer, DaBlob>),
390+
)
380391
.with_state(handle);
381392

382393
Server::bind(&self.settings.address)

nodes/nomos-node/src/api/handlers.rs

+54-5
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
11
use std::{error::Error, fmt::Debug, hash::Hash};
22

33
use axum::{
4+
body::StreamBody,
45
extract::{Query, State},
5-
response::Response,
6+
response::{IntoResponse, Response},
67
Json,
78
};
9+
use http::{header, StatusCode};
810
use nomos_api::http::{
911
cl, consensus,
1012
da::{self, PeerMessagesFactory},
1113
libp2p, mempool, storage,
1214
};
1315
use nomos_core::{
1416
da::{
15-
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob},
17+
blob::{info::DispersedBlobInfo, metadata::Metadata, Blob, LightBlob},
1618
BlobId, DaVerifier as CoreDaVerifier,
1719
},
1820
header::HeaderId,
1921
tx::Transaction,
2022
};
21-
use nomos_da_messages::http::da::{DABlobCommitmentsRequest, DAGetLightBlobReq, GetRangeReq};
23+
use nomos_da_messages::http::da::{
24+
DABlobCommitmentsRequest, DAGetLightBlobReq, GetRangeReq, GetSharesRequest,
25+
};
2226
use nomos_da_network_core::SubnetworkId;
2327
use nomos_da_network_service::backends::NetworkBackend;
2428
use nomos_da_sampling::backend::DaSamplingServiceBackend;
@@ -259,7 +263,8 @@ where
259263
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
260264
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
261265
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
262-
<B as Blob>::LightBlob: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
266+
<B as Blob>::LightBlob:
267+
LightBlob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
263268
<B as Blob>::SharedCommitments: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
264269
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
265270
+ Clone
@@ -505,7 +510,7 @@ where
505510
DaBlob: Blob,
506511
<DaBlob as Blob>::BlobId: AsRef<[u8]> + DeserializeOwned + Clone + Send + Sync + 'static,
507512
<DaBlob as Blob>::ColumnIndex: AsRef<[u8]> + DeserializeOwned + Send + Sync + 'static,
508-
<DaBlob as Blob>::LightBlob: Serialize + DeserializeOwned + Send + Sync + 'static,
513+
<DaBlob as Blob>::LightBlob: LightBlob + Serialize + DeserializeOwned + Send + Sync + 'static,
509514
StorageOp: StorageSerde + Send + Sync + 'static,
510515
<StorageOp as StorageSerde>::Error: Send + Sync,
511516
{
@@ -516,6 +521,50 @@ where
516521
))
517522
}
518523

524+
#[utoipa::path(
525+
get,
526+
path = paths::DA_GET_SHARES,
527+
responses(
528+
(status = 200, description = "Request shares for a blob", body = GetSharesRequest<DaBlob>),
529+
(status = 500, description = "Internal server error", body = StreamBody),
530+
)
531+
)]
532+
pub async fn da_get_shares<StorageOp, DaBlob>(
533+
State(handle): State<OverwatchHandle>,
534+
Json(request): Json<GetSharesRequest<DaBlob>>,
535+
) -> Response
536+
where
537+
DaBlob: Blob,
538+
<DaBlob as Blob>::BlobId: AsRef<[u8]> + DeserializeOwned + Clone + Send + Sync + 'static,
539+
<DaBlob as Blob>::ColumnIndex: serde::Serialize + DeserializeOwned + Send + Sync + Eq + Hash,
540+
<DaBlob as Blob>::LightBlob: LightBlob<ColumnIndex = <DaBlob as Blob>::ColumnIndex>
541+
+ Serialize
542+
+ DeserializeOwned
543+
+ Send
544+
+ Sync
545+
+ 'static,
546+
StorageOp: StorageSerde + Send + Sync + 'static,
547+
<StorageOp as StorageSerde>::Error: Send + Sync,
548+
<DaBlob as Blob>::LightBlob: LightBlob,
549+
<DaBlob as Blob>::ColumnIndex: 'static,
550+
{
551+
match storage::get_shares::<StorageOp, DaBlob>(
552+
&handle,
553+
request.blob_id,
554+
request.requested_shares,
555+
request.filter_shares,
556+
request.return_available,
557+
)
558+
.await
559+
{
560+
Ok(shares) => {
561+
let body = StreamBody::new(shares);
562+
IntoResponse::into_response(([(header::CONTENT_TYPE, "application/x-ndjson")], body))
563+
}
564+
Err(e) => IntoResponse::into_response((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
565+
}
566+
}
567+
519568
#[utoipa::path(
520569
post,
521570
path = paths::MEMPOOL_ADD_TX,

nodes/nomos-node/src/api/paths.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub const CRYPTARCHIA_HEADERS: &str = "/cryptarchia/headers";
55
pub const DA_ADD_BLOB: &str = "/da/add-blob";
66
pub const DA_GET_RANGE: &str = "/da/get-range";
77
pub const DA_GET_SHARED_COMMITMENTS: &str = "/da/get-commitments";
8+
pub const DA_GET_SHARES: &str = "/da/sampling/shares";
89
pub const DA_GET_LIGHT_BLOB: &str = "/da/get-blob";
910
pub const DA_BLOCK_PEER: &str = "/da/block-peer";
1011
pub const DA_UNBLOCK_PEER: &str = "/da/unblock-peer";

nomos-core/chain-defs/src/da/blob/mod.rs

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ pub mod info;
22
pub mod metadata;
33
pub mod select;
44

5+
pub trait LightBlob {
6+
type ColumnIndex;
7+
fn column_idx(&self) -> Self::ColumnIndex;
8+
}
9+
510
pub trait Blob {
611
type BlobId;
712
type ColumnIndex;

nomos-da/kzgrs-backend/src/common/blob.rs

+8
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ pub struct DaLightBlob {
126126
pub rows_proofs: Vec<Proof>,
127127
}
128128

129+
impl blob::LightBlob for DaLightBlob {
130+
type ColumnIndex = [u8; 2];
131+
132+
fn column_idx(&self) -> Self::ColumnIndex {
133+
self.column_idx.to_be_bytes()
134+
}
135+
}
136+
129137
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
130138
pub struct DaBlobSharedCommitments {
131139
#[serde(

nomos-da/network/messages/src/http/da.rs

+11
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,14 @@ pub struct DAGetLightBlobReq<B: Blob> {
2525
pub blob_id: B::BlobId,
2626
pub column_idx: B::ColumnIndex,
2727
}
28+
29+
#[derive(Serialize, Deserialize)]
30+
pub struct GetSharesRequest<B: Blob>
31+
where
32+
<B as Blob>::ColumnIndex: Serialize + DeserializeOwned,
33+
{
34+
pub blob_id: B::BlobId,
35+
pub requested_shares: Vec<B::ColumnIndex>,
36+
pub filter_shares: Vec<B::ColumnIndex>,
37+
pub return_available: bool,
38+
}

nomos-services/api/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ instrumentation = []
1111

1212
[dependencies]
1313
async-trait = "0.1"
14+
bytes = "1.10.1"
1415
cryptarchia-consensus = { workspace = true, features = ["libp2p"] }
16+
futures = "0.3.31"
1517
kzgrs-backend = { workspace = true }
1618
nomos-blend-service = { workspace = true, features = ["libp2p"] }
1719
nomos-core = { workspace = true }
@@ -31,6 +33,7 @@ overwatch = { workspace = true }
3133
overwatch-derive = { workspace = true }
3234
rand = "0.8"
3335
serde = { version = "1", features = ["derive"] }
36+
serde_json = "1.0.140"
3437
subnetworks-assignations = { workspace = true }
3538
tokio = { version = "1", default-features = false, features = ["sync"] }
3639

0 commit comments

Comments
 (0)