@@ -14,8 +14,6 @@ use std::sync::{Arc, Mutex, RwLock};
1414#[ cfg( feature = "tokio" ) ]
1515use core:: future:: Future ;
1616#[ cfg( feature = "tokio" ) ]
17- use core:: pin:: Pin ;
18- #[ cfg( feature = "tokio" ) ]
1917use lightning:: util:: persist:: KVStore ;
2018
2119#[ cfg( target_os = "windows" ) ]
@@ -464,93 +462,85 @@ impl FilesystemStoreInner {
464462impl KVStore for FilesystemStore {
465463 fn read (
466464 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
467- ) -> Pin < Box < dyn Future < Output = Result < Vec < u8 > , lightning:: io:: Error > > + ' static + Send > > {
465+ ) -> impl Future < Output = Result < Vec < u8 > , lightning:: io:: Error > > + ' static + Send {
468466 let this = Arc :: clone ( & self . inner ) ;
469- let path = match this. get_checked_dest_file_path (
467+ let path = this. get_checked_dest_file_path (
470468 primary_namespace,
471469 secondary_namespace,
472470 Some ( key) ,
473471 "read" ,
474- ) {
475- Ok ( path) => path,
476- Err ( e) => return Box :: pin ( async move { Err ( e) } ) ,
477- } ;
472+ ) ;
478473
479- Box :: pin ( async move {
474+ async move {
475+ let path = match path {
476+ Ok ( path) => path,
477+ Err ( e) => return Err ( e) ,
478+ } ;
480479 tokio:: task:: spawn_blocking ( move || this. read ( path) ) . await . unwrap_or_else ( |e| {
481480 Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) )
482481 } )
483- } )
482+ }
484483 }
485484
486485 fn write (
487486 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
488- ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
487+ ) -> impl Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send {
489488 let this = Arc :: clone ( & self . inner ) ;
490- let path = match this. get_checked_dest_file_path (
491- primary_namespace,
492- secondary_namespace,
493- Some ( key) ,
494- "write" ,
495- ) {
496- Ok ( path) => path,
497- Err ( e) => return Box :: pin ( async move { Err ( e) } ) ,
498- } ;
499-
500- let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( path. clone ( ) ) ;
501- Box :: pin ( async move {
489+ let path = this
490+ . get_checked_dest_file_path ( primary_namespace, secondary_namespace, Some ( key) , "write" )
491+ . map ( |path| ( self . get_new_version_and_lock_ref ( path. clone ( ) ) , path) ) ;
492+
493+ async move {
494+ let ( ( inner_lock_ref, version) , path) = match path {
495+ Ok ( res) => res,
496+ Err ( e) => return Err ( e) ,
497+ } ;
502498 tokio:: task:: spawn_blocking ( move || {
503499 this. write_version ( inner_lock_ref, path, buf, version)
504500 } )
505501 . await
506502 . unwrap_or_else ( |e| Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) ) )
507- } )
503+ }
508504 }
509505
510506 fn remove (
511507 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
512- ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
508+ ) -> impl Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send {
513509 let this = Arc :: clone ( & self . inner ) ;
514- let path = match this. get_checked_dest_file_path (
515- primary_namespace,
516- secondary_namespace,
517- Some ( key) ,
518- "remove" ,
519- ) {
520- Ok ( path) => path,
521- Err ( e) => return Box :: pin ( async move { Err ( e) } ) ,
522- } ;
523-
524- let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( path. clone ( ) ) ;
525- Box :: pin ( async move {
510+ let path = this
511+ . get_checked_dest_file_path ( primary_namespace, secondary_namespace, Some ( key) , "remove" )
512+ . map ( |path| ( self . get_new_version_and_lock_ref ( path. clone ( ) ) , path) ) ;
513+
514+ async move {
515+ let ( ( inner_lock_ref, version) , path) = match path {
516+ Ok ( res) => res,
517+ Err ( e) => return Err ( e) ,
518+ } ;
526519 tokio:: task:: spawn_blocking ( move || {
527- this. remove_version ( inner_lock_ref, path, lazy , version )
520+ this. remove_version ( inner_lock_ref, path, version , lazy )
528521 } )
529522 . await
530523 . unwrap_or_else ( |e| Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) ) )
531- } )
524+ }
532525 }
533526
534527 fn list (
535528 & self , primary_namespace : & str , secondary_namespace : & str ,
536- ) -> Pin < Box < dyn Future < Output = Result < Vec < String > , lightning:: io:: Error > > + ' static + Send > > {
529+ ) -> impl Future < Output = Result < Vec < String > , lightning:: io:: Error > > + ' static + Send {
537530 let this = Arc :: clone ( & self . inner ) ;
538531
539- let path = match this. get_checked_dest_file_path (
540- primary_namespace,
541- secondary_namespace,
542- None ,
543- "list" ,
544- ) {
545- Ok ( path) => path,
546- Err ( e) => return Box :: pin ( async move { Err ( e) } ) ,
547- } ;
532+ let path =
533+ this. get_checked_dest_file_path ( primary_namespace, secondary_namespace, None , "list" ) ;
548534
549- Box :: pin ( async move {
535+ async move {
536+ let path = match path {
537+ Ok ( path) => path,
538+ Err ( e) => return Err ( e) ,
539+ } ;
550540 tokio:: task:: spawn_blocking ( move || this. list ( path) ) . await . unwrap_or_else ( |e| {
551541 Err ( lightning:: io:: Error :: new ( lightning:: io:: ErrorKind :: Other , e) )
552542 } )
553- } )
543+ }
554544 }
555545}
556546
@@ -758,24 +748,24 @@ mod tests {
758748 let fs_store = Arc :: new ( FilesystemStore :: new ( temp_path) ) ;
759749 assert_eq ! ( fs_store. state_size( ) , 0 ) ;
760750
761- let async_fs_store: Arc < dyn KVStore > = fs_store . clone ( ) ;
751+ let async_fs_store = Arc :: clone ( & fs_store ) ;
762752
763753 let data1 = vec ! [ 42u8 ; 32 ] ;
764754 let data2 = vec ! [ 43u8 ; 32 ] ;
765755
766- let primary_namespace = "testspace" ;
767- let secondary_namespace = "testsubspace" ;
756+ let primary = "testspace" ;
757+ let secondary = "testsubspace" ;
768758 let key = "testkey" ;
769759
770760 // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
771761 // that eventual consistency works.
772- let fut1 = async_fs_store . write ( primary_namespace , secondary_namespace , key, data1) ;
762+ let fut1 = KVStore :: write ( & * async_fs_store , primary , secondary , key, data1) ;
773763 assert_eq ! ( fs_store. state_size( ) , 1 ) ;
774764
775- let fut2 = async_fs_store . remove ( primary_namespace , secondary_namespace , key, false ) ;
765+ let fut2 = KVStore :: remove ( & * async_fs_store , primary , secondary , key, false ) ;
776766 assert_eq ! ( fs_store. state_size( ) , 1 ) ;
777767
778- let fut3 = async_fs_store . write ( primary_namespace , secondary_namespace , key, data2. clone ( ) ) ;
768+ let fut3 = KVStore :: write ( & * async_fs_store , primary , secondary , key, data2. clone ( ) ) ;
779769 assert_eq ! ( fs_store. state_size( ) , 1 ) ;
780770
781771 fut3. await . unwrap ( ) ;
@@ -788,21 +778,18 @@ mod tests {
788778 assert_eq ! ( fs_store. state_size( ) , 0 ) ;
789779
790780 // Test list.
791- let listed_keys =
792- async_fs_store. list ( primary_namespace, secondary_namespace) . await . unwrap ( ) ;
781+ let listed_keys = KVStore :: list ( & * async_fs_store, primary, secondary) . await . unwrap ( ) ;
793782 assert_eq ! ( listed_keys. len( ) , 1 ) ;
794783 assert_eq ! ( listed_keys[ 0 ] , key) ;
795784
796785 // Test read. We expect to read data2, as the write call was initiated later.
797- let read_data =
798- async_fs_store. read ( primary_namespace, secondary_namespace, key) . await . unwrap ( ) ;
786+ let read_data = KVStore :: read ( & * async_fs_store, primary, secondary, key) . await . unwrap ( ) ;
799787 assert_eq ! ( data2, & * read_data) ;
800788
801789 // Test remove.
802- async_fs_store . remove ( primary_namespace , secondary_namespace , key, false ) . await . unwrap ( ) ;
790+ KVStore :: remove ( & * async_fs_store , primary , secondary , key, false ) . await . unwrap ( ) ;
803791
804- let listed_keys =
805- async_fs_store. list ( primary_namespace, secondary_namespace) . await . unwrap ( ) ;
792+ let listed_keys = KVStore :: list ( & * async_fs_store, primary, secondary) . await . unwrap ( ) ;
806793 assert_eq ! ( listed_keys. len( ) , 0 ) ;
807794 }
808795
0 commit comments