Skip to content

Commit 5b720d5

Browse files
committed
convert account relay list support to subman
1 parent e399e1e commit 5b720d5

File tree

2 files changed

+63
-97
lines changed

2 files changed

+63
-97
lines changed

crates/notedeck/src/accounts.rs

+63-92
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ use crate::{
55
KeyStorageResponse, KeyStorageType, MuteFun, Muted, RelaySpec, SingleUnkIdAction, SubError,
66
SubMan, UnknownIds, UserAccount,
77
};
8-
use enostr::{ClientMessage, FilledKeypair, Keypair, RelayPool};
9-
use nostrdb::{Filter, Ndb, Note, NoteBuilder, NoteKey, Subscription, Transaction};
8+
use enostr::{FilledKeypair, Keypair, RelayPool};
9+
use nostrdb::{Filter, Ndb, Note, NoteBuilder, NoteKey, Transaction};
1010
use std::cmp::Ordering;
1111
use std::collections::{BTreeMap, BTreeSet};
1212
use url::Url;
13-
use uuid::Uuid;
1413

1514
// TODO: remove this
1615
use std::sync::{Arc, Mutex};
@@ -38,10 +37,9 @@ pub enum AccountsAction {
3837

3938
pub struct AccountRelayData {
4039
filter: Filter,
41-
subid: Option<String>,
42-
sub: Option<Subscription>,
43-
local: BTreeSet<RelaySpec>, // used locally but not advertised
44-
advertised: BTreeSet<RelaySpec>, // advertised via NIP-65
40+
subrmtid: Option<RmtId>,
41+
local: BTreeSet<RelaySpec>, // used locally but not advertised
42+
advertised: Arc<Mutex<BTreeSet<RelaySpec>>>, // advertised via NIP-65
4543
}
4644

4745
#[derive(Default)]
@@ -83,47 +81,64 @@ impl AccountRelayData {
8381

8482
AccountRelayData {
8583
filter,
86-
subid: None,
87-
sub: None,
84+
subrmtid: None,
8885
local: BTreeSet::new(),
89-
advertised: relays.into_iter().collect(),
86+
advertised: Arc::new(Mutex::new(relays.into_iter().collect())),
9087
}
9188
}
9289

9390
// make this account the current selected account
94-
pub fn activate(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
91+
pub fn activate(&mut self, subman: &mut SubMan, default_relays: &[RelaySpec]) {
9592
debug!("activating relay sub {}", self.filter.json().unwrap());
96-
assert_eq!(self.subid, None, "subid already exists");
97-
assert_eq!(self.sub, None, "sub already exists");
98-
99-
// local subscription
100-
let sub = ndb
101-
.subscribe(&[self.filter.clone()])
102-
.expect("ndb relay list subscription");
103-
104-
// remote subscription
105-
let subid = Uuid::new_v4().to_string();
106-
pool.subscribe(subid.clone(), vec![self.filter.clone()]);
107-
108-
self.sub = Some(sub);
109-
self.subid = Some(subid);
93+
assert!(self.subrmtid.is_none(), "subscription already exists");
94+
let ndb = subman.ndb();
95+
let subspec = SubSpecBuilder::new()
96+
.filters(vec![self.filter.clone()])
97+
.build();
98+
debug!(
99+
"activating account relay sub {}: {}",
100+
subspec.rmtid,
101+
self.filter.json().unwrap()
102+
);
103+
if let Ok(mut rcvr) = subman.subscribe(subspec, default_relays) {
104+
let idstr = rcvr.idstr();
105+
self.subrmtid = rcvr.rmtid();
106+
let advertisedref = self.advertised.clone();
107+
tokio::spawn(async move {
108+
loop {
109+
match rcvr.next().await {
110+
Err(SubError::StreamEnded) => {
111+
debug!("account relays: sub {} complete", idstr);
112+
break;
113+
}
114+
Err(err) => {
115+
error!("account relays: sub {}: error: {:?}", idstr, err);
116+
break;
117+
}
118+
Ok(nks) => {
119+
debug!("account relays: sub {}: note keys: {:?}", idstr, nks);
120+
let txn = Transaction::new(&ndb).expect("txn");
121+
let relays = Self::harvest_nip65_relays(&ndb, &txn, &nks);
122+
debug!("updated relays {:?}", relays);
123+
*advertisedref.lock().unwrap() = relays.into_iter().collect();
124+
}
125+
}
126+
}
127+
});
128+
}
110129
}
111130

112131
// this account is no longer the selected account
113-
pub fn deactivate(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) {
114-
debug!("deactivating relay sub {}", self.filter.json().unwrap());
115-
assert_ne!(self.subid, None, "subid doesn't exist");
116-
assert_ne!(self.sub, None, "sub doesn't exist");
117-
118-
// remote subscription
119-
pool.unsubscribe(self.subid.as_ref().unwrap().clone());
120-
121-
// local subscription
122-
ndb.unsubscribe(self.sub.unwrap())
123-
.expect("ndb relay list unsubscribe");
124-
125-
self.sub = None;
126-
self.subid = None;
132+
pub fn deactivate(&mut self, subman: &mut SubMan) {
133+
assert!(self.subrmtid.is_some(), "subscription doesn't exist");
134+
let rmtid = self.subrmtid.as_ref().unwrap();
135+
debug!(
136+
"deactivating account relays sub {}: {}",
137+
rmtid,
138+
self.filter.json().unwrap()
139+
);
140+
subman.unsubscribe_rmtid(rmtid).ok();
141+
self.subrmtid = None;
127142
}
128143

129144
// standardize the format (ie, trailing slashes) to avoid dups
@@ -173,7 +188,7 @@ impl AccountRelayData {
173188

174189
pub fn publish_nip65_relays(&self, seckey: &[u8; 32], pool: &mut RelayPool) {
175190
let mut builder = NoteBuilder::new().kind(10002).content("");
176-
for rs in &self.advertised {
191+
for rs in self.advertised.lock().unwrap().iter() {
177192
builder = builder.start_tag().tag_str("r").tag_str(&rs.url);
178193
if rs.has_read_marker {
179194
builder = builder.tag_str("read");
@@ -542,26 +557,6 @@ impl Accounts {
542557
Box::new(|_: &Note, _: &[u8; 32]| false)
543558
}
544559

545-
pub fn send_initial_filters(&mut self, pool: &mut RelayPool, relay_url: &str) {
546-
for data in self.account_data.values() {
547-
// send the active account's relay list subscription
548-
if let Some(relay_subid) = &data.relay.subid {
549-
let filters = vec![data.relay.filter.clone()];
550-
debug!(
551-
"Account send_initial_filters: sending sub {} {}: {:?}",
552-
relay_subid,
553-
relay_url,
554-
filters
555-
.iter()
556-
.map(|f| f.json().unwrap_or_default())
557-
.collect::<Vec<_>>(),
558-
);
559-
560-
pool.send_to(&ClientMessage::req(relay_subid.clone(), filters), relay_url);
561-
}
562-
}
563-
}
564-
565560
// Return accounts which have no account_data yet (added) and accounts
566561
// which have still data but are no longer in our account list (removed).
567562
fn delta_accounts(&self) -> (Vec<[u8; 32]>, Vec<[u8; 32]>) {
@@ -597,27 +592,6 @@ impl Accounts {
597592
self.account_data.remove(pubkey);
598593
}
599594

600-
fn poll_for_updates(&mut self, ndb: &Ndb) -> bool {
601-
let mut changed = false;
602-
for (pubkey, data) in &mut self.account_data {
603-
if let Some(sub) = data.relay.sub {
604-
let nks = ndb.poll_for_notes(sub, 1);
605-
if !nks.is_empty() {
606-
let txn = Transaction::new(ndb).expect("txn");
607-
let relays = AccountRelayData::harvest_nip65_relays(ndb, &txn, &nks);
608-
debug!(
609-
"pubkey {}: updated relays {:?}",
610-
hex::encode(pubkey),
611-
relays
612-
);
613-
data.relay.advertised = relays.into_iter().collect();
614-
changed = true;
615-
}
616-
}
617-
}
618-
changed
619-
}
620-
621595
fn update_relay_configuration(
622596
&mut self,
623597
pool: &mut RelayPool,
@@ -636,7 +610,7 @@ impl Accounts {
636610
if desired_relays.is_empty() {
637611
if let Some(data) = self.get_selected_account_data() {
638612
desired_relays.extend(data.relay.local.iter().cloned());
639-
desired_relays.extend(data.relay.advertised.iter().cloned());
613+
desired_relays.extend(data.relay.advertised.lock().unwrap().iter().cloned());
640614
}
641615
}
642616

@@ -681,6 +655,8 @@ impl Accounts {
681655
let mut relays = if let Some(data) = data_option {
682656
data.relay
683657
.advertised
658+
.lock()
659+
.unwrap()
684660
.iter()
685661
.filter(|&x| filter(x))
686662
.cloned()
@@ -729,11 +705,9 @@ impl Accounts {
729705
if Some(ndx) != self.currently_selected_account {
730706
// this account is not currently selected
731707
if let Some(data) = self.account_data.get_mut(account.pubkey.bytes()) {
732-
if data.relay.sub.is_some() {
708+
if data.relay.subrmtid.is_some() {
733709
// this account has relay subs, deactivate them
734-
let mut ndb = subman.ndb().clone();
735-
let pool = subman.pool();
736-
data.relay.deactivate(&mut ndb, pool);
710+
data.relay.deactivate(subman);
737711
}
738712
if data.muted.subrmtid.is_some() {
739713
// this account has muted subs, deactivate them
@@ -757,9 +731,6 @@ impl Accounts {
757731
need_reconfig = true;
758732
}
759733

760-
// Did any accounts receive updates (ie NIP-65 relay lists)
761-
need_reconfig = self.poll_for_updates(&ndb) || need_reconfig;
762-
763734
// If needed, update the relay configuration
764735
if need_reconfig {
765736
self.update_relay_configuration(pool, wakeup);
@@ -769,9 +740,9 @@ impl Accounts {
769740
// Do we need to activate account subs?
770741
let default_relays = self.get_all_selected_account_relays();
771742
if let Some(data) = self.get_selected_account_data_mut() {
772-
if data.relay.sub.is_none() {
743+
if data.relay.subrmtid.is_none() {
773744
// the currently selected account doesn't have relay subs, activate them
774-
data.relay.activate(&ndb, pool);
745+
data.relay.activate(subman, &default_relays);
775746
}
776747
if data.muted.subrmtid.is_none() {
777748
// the currently selected account doesn't have muted subs, activate them
@@ -812,7 +783,7 @@ impl Accounts {
812783
match self.account_data.get_mut(&key_bytes) {
813784
None => error!("no account data found for the provided key."),
814785
Some(account_data) => {
815-
let advertised = &mut account_data.relay.advertised;
786+
let advertised = &mut account_data.relay.advertised.lock().unwrap();
816787
if advertised.is_empty() {
817788
// If the selected account has no advertised relays,
818789
// initialize with the bootstrapping set.

crates/notedeck_columns/src/app.rs

-5
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ struct RelayHandler<'a> {
8787
// From AppContext
8888
unknown_ids: &'a mut UnknownIds,
8989
note_cache: &'a mut NoteCache,
90-
accounts: &'a mut Accounts,
9190

9291
// From Damus
9392
subscriptions: &'a mut Subscriptions,
@@ -100,14 +99,12 @@ impl<'a> RelayHandler<'a> {
10099
fn new(
101100
unknown_ids: &'a mut UnknownIds,
102101
note_cache: &'a mut NoteCache,
103-
accounts: &'a mut Accounts,
104102
subscriptions: &'a mut Subscriptions,
105103
timeline_cache: &'a mut TimelineCache,
106104
since_optimize: bool,
107105
) -> Self {
108106
RelayHandler {
109107
unknown_ids,
110-
accounts,
111108
note_cache,
112109
subscriptions,
113110
timeline_cache,
@@ -119,7 +116,6 @@ impl<'a> RelayHandler<'a> {
119116
impl LegacyRelayHandler for RelayHandler<'_> {
120117
/// Handle relay opened
121118
fn handle_opened(&mut self, ndb: &mut Ndb, pool: &mut RelayPool, relay: &str) {
122-
self.accounts.send_initial_filters(pool, relay);
123119
timeline::send_initial_timeline_filters(
124120
ndb,
125121
self.since_optimize,
@@ -159,7 +155,6 @@ fn try_process_event<'a>(
159155
let mut relay_handler = RelayHandler::new(
160156
app_ctx.unknown_ids,
161157
app_ctx.note_cache,
162-
app_ctx.accounts,
163158
&mut damus.subscriptions,
164159
&mut damus.timeline_cache,
165160
damus.since_optimize,

0 commit comments

Comments
 (0)