1
1
use futures:: { channel:: mpsc, FutureExt , StreamExt } ;
2
- use std:: collections:: BTreeMap ;
2
+ use std:: collections:: { BTreeMap , BTreeSet } ;
3
3
use std:: fmt;
4
4
use std:: { cell:: RefCell , cmp:: Ordering , rc:: Rc } ;
5
5
use thiserror:: Error ;
@@ -24,6 +24,7 @@ use crate::RelaySpec;
24
24
/// async fn main() -> Result<(), Box<dyn Error>> {
25
25
/// let mut ndb = Ndb::new("the/db/path/", &Config::new())?;
26
26
/// let mut subman = SubMan::new(ndb.clone(), RelayPool::new());
27
+ /// let default_relays = vec![];
27
28
///
28
29
/// // Define a filter and build the subscription specification
29
30
/// let filter = Filter::new().kinds(vec![1, 2, 3]).build();
@@ -33,7 +34,7 @@ use crate::RelaySpec;
33
34
/// .build();
34
35
///
35
36
/// // Subscribe and obtain a SubReceiver
36
- /// let mut receiver = subman.subscribe(spec)?;
37
+ /// let mut receiver = subman.subscribe(spec, &default_relays )?;
37
38
///
38
39
/// // Process incoming note keys
39
40
/// loop {
@@ -514,6 +515,7 @@ impl SubMan {
514
515
pub fn process_relays < H : LegacyRelayHandler > (
515
516
& mut self ,
516
517
legacy_relay_handler : & mut H ,
518
+ default_relays : & [ RelaySpec ] ,
517
519
) -> SubResult < ( ) > {
518
520
let wakeup = move || {
519
521
// ignore
@@ -580,6 +582,9 @@ impl SubMan {
580
582
}
581
583
}
582
584
}
585
+
586
+ self . close_unneeded_relays ( default_relays) ;
587
+
583
588
Ok ( ( ) )
584
589
}
585
590
@@ -646,6 +651,41 @@ impl SubMan {
646
651
}
647
652
}
648
653
}
654
+
655
+ fn close_unneeded_relays ( & mut self , default_relays : & [ RelaySpec ] ) {
656
+ let current_relays: BTreeSet < String > = self . pool . urls ( ) ;
657
+ let needed_relays: BTreeSet < String > = self . needed_relays ( default_relays) ;
658
+ let unneeded_relays: BTreeSet < _ > =
659
+ current_relays. difference ( & needed_relays) . cloned ( ) . collect ( ) ;
660
+ if !unneeded_relays. is_empty ( ) {
661
+ debug ! ( "closing unneeded relays: {:?}" , unneeded_relays) ;
662
+ self . pool . remove_urls ( & unneeded_relays) ;
663
+ }
664
+ }
665
+
666
+ fn needed_relays ( & self , default_relays : & [ RelaySpec ] ) -> BTreeSet < String > {
667
+ let mut needed: BTreeSet < String > = default_relays. iter ( ) . map ( |rs| rs. url . clone ( ) ) . collect ( ) ;
668
+ // for every remote subscription
669
+ for ssr in self . rmt . values ( ) {
670
+ // that has remote substate (all will)
671
+ if let Some ( ref rmtsubstate) = ssr. borrow ( ) . rmt {
672
+ // for each subscription remote relay
673
+ for ( relay, state) in & rmtsubstate. relays {
674
+ // include any that are in-play
675
+ match state {
676
+ RelaySubState :: Error ( _) | RelaySubState :: Closed => {
677
+ // these are terminal and we don't need this relay
678
+ }
679
+ _ => {
680
+ // relays in all other states are needed
681
+ _ = needed. insert ( relay. clone ( ) ) ;
682
+ }
683
+ }
684
+ }
685
+ }
686
+ }
687
+ needed
688
+ }
649
689
}
650
690
651
691
pub trait LegacyRelayHandler {
@@ -755,13 +795,15 @@ mod tests {
755
795
// setup an ndb and subman to test
756
796
let ( _mndb, ndb) = ManagedNdb :: setup ( & testdbs_path_async ! ( ) ) ;
757
797
let mut subman = SubMan :: new ( ndb. clone ( ) , RelayPool :: new ( ) ) ;
798
+ let default_relays = vec ! [ ] ;
758
799
759
800
// subscribe to some stuff
760
801
let mut receiver = subman. subscribe (
761
802
SubSpecBuilder :: new ( )
762
803
. filters ( vec ! [ Filter :: new( ) . kinds( vec![ 1 ] ) . build( ) ] )
763
804
. constraint ( SubConstraint :: OnlyLocal )
764
805
. build ( ) ,
806
+ & default_relays,
765
807
) ?;
766
808
let lclid = receiver. lclid ( ) . unwrap ( ) ;
767
809
@@ -796,13 +838,15 @@ mod tests {
796
838
// setup an ndb and subman to test
797
839
let ( _mndb, ndb) = ManagedNdb :: setup ( & testdbs_path_async ! ( ) ) ;
798
840
let mut subman = SubMan :: new ( ndb. clone ( ) , RelayPool :: new ( ) ) ;
841
+ let default_relays = vec ! [ ] ;
799
842
800
843
// subscribe to some stuff
801
844
let mut receiver = subman. subscribe (
802
845
SubSpecBuilder :: new ( )
803
846
. filters ( vec ! [ Filter :: new( ) . kinds( vec![ 1 ] ) . build( ) ] )
804
847
. constraint ( SubConstraint :: OnlyLocal )
805
848
. build ( ) ,
849
+ & default_relays,
806
850
) ?;
807
851
let lclid = receiver. lclid ( ) . unwrap ( ) ;
808
852
@@ -842,13 +886,15 @@ mod tests {
842
886
// setup an ndb and subman to test
843
887
let ( _mndb, ndb) = ManagedNdb :: setup ( & testdbs_path_async ! ( ) ) ;
844
888
let mut subman = SubMan :: new ( ndb. clone ( ) , RelayPool :: new ( ) ) ;
889
+ let default_relays = vec ! [ ] ;
845
890
846
891
// subscribe to some stuff
847
892
let mut receiver = subman. subscribe (
848
893
SubSpecBuilder :: new ( )
849
894
. filters ( vec ! [ Filter :: new( ) . kinds( vec![ 1 ] ) . build( ) ] )
850
895
. constraint ( SubConstraint :: OnlyLocal )
851
896
. build ( ) ,
897
+ & default_relays,
852
898
) ?;
853
899
let lclid = receiver. lclid ( ) . unwrap ( ) ;
854
900
0 commit comments