Skip to content

Commit e0bef72

Browse files
committed
WIP: use subman API to resolve unknown ids
1 parent 7d6ee98 commit e0bef72

File tree

3 files changed

+100
-46
lines changed

3 files changed

+100
-46
lines changed

crates/notedeck/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ pub use storage::{
4747
DataPath, DataPathType, Directory, FileKeyStorage, KeyStorageResponse, KeyStorageType,
4848
};
4949
pub use style::NotedeckTextStyle;
50-
pub use subman::SubMan;
50+
pub use subman::{SubError, SubMan};
5151
pub use theme::ColorTheme;
5252
pub use time::time_ago_since;
5353
pub use timecache::TimeCached;

crates/notedeck/src/unknowns.rs

+64-32
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use crate::{
22
note::NoteRef,
33
notecache::{CachedNote, NoteCache},
4+
subman::{SubConstraint, SubSpec, SubSpecBuilder},
45
Result,
56
};
67

78
use enostr::{Filter, NoteId, Pubkey};
89
use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction};
9-
use std::collections::{HashMap, HashSet};
10+
use std::collections::{BTreeMap, HashMap, HashSet};
1011
use std::time::{Duration, Instant};
1112
use tracing::error;
1213

@@ -118,13 +119,72 @@ impl UnknownIds {
118119
&mut self.ids
119120
}
120121

122+
pub fn numids(&self) -> usize {
123+
self.ids.len()
124+
}
125+
121126
pub fn clear(&mut self) {
122127
self.ids = HashMap::default();
123128
}
124129

125-
pub fn filter(&self) -> Option<Vec<Filter>> {
126-
let ids: Vec<&UnknownId> = self.ids.keys().collect();
127-
get_unknown_ids_filter(&ids)
130+
pub fn generate_resolution_requests(&self) -> Vec<SubSpec> {
131+
// 1. resolve as many ids per request as possible
132+
// 2. each request only has one filter (https://github.com/nostr-protocol/nips/pull/1645)
133+
// 3. each request is limited to MAX_CHUNK_IDS
134+
// 4. use relay hints when available
135+
136+
// Collect the unknown ids by relay
137+
let mut ids_by_relay: BTreeMap<RelayUrl, (Vec<Pubkey>, Vec<NoteId>)> = BTreeMap::new();
138+
for (unknown_id, relay_hints) in self.ids.iter() {
139+
// 1. use default relays (empty RelayUrl) if no hints are available
140+
// 2. query the default relays even when hints are available
141+
for relay in std::iter::once("".to_string()).chain(relay_hints.iter().cloned()) {
142+
match unknown_id {
143+
UnknownId::Pubkey(pk) => {
144+
ids_by_relay
145+
.entry(relay)
146+
.or_insert_with(|| (Vec::new(), Vec::new()))
147+
.0
148+
.push(*pk);
149+
}
150+
UnknownId::Id(nid) => {
151+
ids_by_relay
152+
.entry(relay)
153+
.or_insert_with(|| (Vec::new(), Vec::new()))
154+
.1
155+
.push(*nid);
156+
}
157+
}
158+
}
159+
}
160+
161+
const MAX_CHUNK_IDS: usize = 500;
162+
163+
let mut subspecs = vec![];
164+
for (relay, (pubkeys, noteids)) in ids_by_relay {
165+
// make a template SubSpecBuilder w/ the common parts
166+
let mut ssb = SubSpecBuilder::new().constraint(SubConstraint::OneShot);
167+
if !relay.is_empty() {
168+
ssb = ssb.constraint(SubConstraint::AllowedRelays(vec![relay]));
169+
}
170+
for chunk in pubkeys.chunks(MAX_CHUNK_IDS) {
171+
let pks: Vec<&[u8; 32]> = chunk.iter().map(|pk| pk.bytes()).collect();
172+
subspecs.push(
173+
ssb.clone()
174+
.filters(vec![Filter::new().authors(pks).kinds([0]).build()])
175+
.build(),
176+
);
177+
}
178+
for chunk in noteids.chunks(MAX_CHUNK_IDS) {
179+
let nids: Vec<&[u8; 32]> = chunk.iter().map(|nid| nid.bytes()).collect();
180+
subspecs.push(
181+
ssb.clone()
182+
.filters(vec![Filter::new().ids(nids).build()])
183+
.build(),
184+
);
185+
}
186+
}
187+
subspecs
128188
}
129189

130190
/// We've updated some unknown ids, update the last_updated time to now
@@ -350,31 +410,3 @@ pub fn get_unknown_note_ids<'a>(
350410

351411
Ok(())
352412
}
353-
354-
fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option<Vec<Filter>> {
355-
if ids.is_empty() {
356-
return None;
357-
}
358-
359-
let ids = &ids[0..500.min(ids.len())];
360-
let mut filters: Vec<Filter> = vec![];
361-
362-
let pks: Vec<&[u8; 32]> = ids
363-
.iter()
364-
.flat_map(|id| id.is_pubkey().map(|pk| pk.bytes()))
365-
.collect();
366-
if !pks.is_empty() {
367-
let pk_filter = Filter::new().authors(pks).kinds([0]).build();
368-
filters.push(pk_filter);
369-
}
370-
371-
let note_ids: Vec<&[u8; 32]> = ids
372-
.iter()
373-
.flat_map(|id| id.is_id().map(|id| id.bytes()))
374-
.collect();
375-
if !note_ids.is_empty() {
376-
filters.push(Filter::new().ids(note_ids).build());
377-
}
378-
379-
Some(filters)
380-
}

crates/notedeck_columns/src/app.rs

+35-13
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ use crate::{
1313
Result,
1414
};
1515

16-
use notedeck::{Accounts, AppContext, DataPath, DataPathType, FilterState, ImageCache, UnknownIds};
16+
use notedeck::{
17+
Accounts, AppContext, DataPath, DataPathType, FilterState, ImageCache, SubError, SubMan,
18+
UnknownIds,
19+
};
1720

18-
use enostr::{ClientMessage, Keypair, PoolRelay, Pubkey, RelayEvent, RelayMessage, RelayPool};
21+
use enostr::{ClientMessage, Keypair, PoolRelay, Pubkey, RelayEvent, RelayMessage};
1922
use uuid::Uuid;
2023

2124
use egui_extras::{Size, StripBuilder};
@@ -157,22 +160,41 @@ fn try_process_event(
157160
}
158161

159162
if app_ctx.unknown_ids.ready_to_send() {
160-
unknown_id_send(app_ctx.unknown_ids, app_ctx.subman.pool());
163+
unknown_id_send(app_ctx.unknown_ids, app_ctx.subman);
161164
}
162165

163166
Ok(())
164167
}
165168

166-
fn unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut RelayPool) {
167-
debug!("unknown_id_send called on: {:?}", &unknown_ids);
168-
let filter = unknown_ids.filter().expect("filter");
169-
info!(
170-
"Getting {} unknown ids from relays",
171-
unknown_ids.ids_iter().len()
172-
);
173-
let msg = ClientMessage::req("unknownids".to_string(), filter);
169+
fn unknown_id_send(unknown_ids: &mut UnknownIds, subman: &mut SubMan) {
170+
info!("Getting {} unknown ids from relays", &unknown_ids.numids());
171+
for subspec in unknown_ids.generate_resolution_requests() {
172+
debug!("unknown_ids subscribe: {:?}", subspec);
173+
match subman.subscribe(subspec) {
174+
Err(err) => error!("unknown_id_send subscribe failed: {:?}", err),
175+
Ok(mut rcvr) => {
176+
tokio::spawn(async move {
177+
loop {
178+
match rcvr.next().await {
179+
Err(SubError::StreamEnded) => {
180+
debug!("unknown_id_send: {} complete", rcvr.idstr());
181+
break;
182+
}
183+
Err(err) => {
184+
error!("unknown_id_send: {}: error: {:?}", rcvr.idstr(), err);
185+
break;
186+
}
187+
Ok(note_keys) => {
188+
debug!("received note keys: {:?}", note_keys);
189+
// only need the prefetch into ndb, all done
190+
}
191+
}
192+
}
193+
});
194+
}
195+
}
196+
}
174197
unknown_ids.clear();
175-
pool.send(&msg);
176198
}
177199

178200
fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Context) {
@@ -233,7 +255,7 @@ fn handle_eose(
233255
);
234256
// this is possible if this is the first time
235257
if ctx.unknown_ids.ready_to_send() {
236-
unknown_id_send(ctx.unknown_ids, ctx.subman.pool());
258+
unknown_id_send(ctx.unknown_ids, ctx.subman);
237259
}
238260
}
239261

0 commit comments

Comments
 (0)