Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(DA): request shares HTTP API #1138

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

andrussal
Copy link
Contributor

1. What does this PR implement?

Implement HTTP API to get shares(blobs) according to spec

As discussed with Daniel, we can try to make it streaming if possible(I think because ideally we don’t need to load all requested blobs into memory at once). At the moment only “frontend” is doing streaming whereas storage backend still returns collection. Should we change it as well...? It seems a bit similar discussion like we had with batch inserts, what we expect our storage backend to support.

For streaming, content type is application/x-ndjson but maybe this needs further discussion what we want our API to support.

2. Does the code have enough context to be clearly understood?

Yes

3. Who are the specification authors and who is accountable for this PR?

@andrussal

4. Is the specification accurate and complete?

N/A

5. Does the implementation introduce changes in the specification?

N/A

Checklist

  • 1. Description added.
  • 2. Context and links to Specification document(s) added.
  • 3. Main contact(s) (developers and specification authors) added
  • 4. Implementation and Specification are 100% in sync including changes. This is critical.
  • 5. Link PR to a specific milestone.

@andrussal andrussal added the da label Mar 14, 2025
@andrussal andrussal added this to the Iteration 10 milestone Mar 14, 2025
@andrussal andrussal force-pushed the feat/da-get-shares-endpoint branch from 9175662 to 5017f86 Compare March 14, 2025 10:40
.connect()
.await?;

let shares_prefix = key_bytes(DA_BLOB_PREFIX, blob_id.as_ref());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, how are we storing shares? We should be able to ask for a blob_id + share_idx.

Comment on lines +140 to +175
let (blob_reply_tx, blob_reply_rx) = oneshot::channel();
storage_relay
.send(StorageMsg::LoadPrefix {
prefix: shares_prefix,
reply_channel: blob_reply_tx,
})
.await
.map_err(|(e, _)| e)?;

let blobs = blob_reply_rx.await?;

let requested_shares = Arc::new(requested_shares);
let filter_shares = Arc::new(filter_shares);

// Wrapping into stream from here because currently storage backend returns
// collection
Ok(stream::iter(blobs)
.filter_map(|bytes| async move { StorageOp::deserialize::<DaBlob::LightBlob>(bytes).ok() })
.filter_map(move |blob| {
let requested = requested_shares.clone();
let filtered = filter_shares.clone();
async move {
if requested.contains(&blob.column_idx())
|| (return_available && !filtered.contains(&blob.column_idx()))
{
let Ok(mut json) = serde_json::to_vec(&blob) else {
return None;
};
json.push(b'\n');
Some(Ok(Bytes::from(json)))
} else {
None
}
}
}))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does storage has the ability to pick a single share?
What I meant is that we could probably request one by one in an unfold for what we need.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it if return_available is set to false. Then we can only ask requested_shares. Otherwise we still need to ask everything because we don't know which shares node has(I think...).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is something wrong then. Because we should.

Copy link
Contributor Author

@andrussal andrussal Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if we need a separate extra storage in our key/value store that keeps track of which shares we have per blob? Because at the moment a share is stored under a key: blob_id+column_idx. We don't know which such blob_id+column_idx pairs exist.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if we need a separate extra storage in our key/value store that keeps track of which shares we have per blob? Because at the moment a share is stored under a key: blob_id+column_idx. We don't know which such blob_id+column_idx pairs exist.

We do not need a separate extra storage for now. We just need to build the key for storing counting with the share_idx probably. I'll review this and come with a solution plan.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if we need a separate extra storage in our key/value store that keeps track of which shares we have per blob? Because at the moment a share is stored under a key: blob_id+column_idx. We don't know which such blob_id+column_idx pairs exist.

Ok, I understood something else previously, But I checked better and yes, we do probably have to add an stored value to keep track which values do we actually have. Let make a different PR adding that indexing entry. Then we can continue with this one once that is merged. Wdyt @andrussal ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I’ll do it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants