@@ -14,7 +14,7 @@ use std::{
1414 io,
1515 ops:: { Deref , DerefMut } ,
1616 path:: { Path , PathBuf } ,
17- sync:: { Arc , RwLock , Weak } ,
17+ sync:: { Arc , Weak } ,
1818} ;
1919
2020use bao_tree:: {
@@ -343,7 +343,7 @@ impl<T> BaoFileHandleWeak<T> {
343343/// The inner part of a bao file handle.
344344#[ derive( Debug ) ]
345345pub struct BaoFileHandleInner < T > {
346- pub ( crate ) storage : RwLock < BaoFileStorage < T > > ,
346+ pub ( crate ) storage : tokio :: sync :: RwLock < BaoFileStorage < T > > ,
347347 config : Arc < BaoFileConfig > ,
348348 hash : Hash ,
349349}
@@ -432,15 +432,9 @@ where
432432 return res;
433433 }
434434 } ;
435- // otherwise, we have to spawn a task.
436- let ( handle, res) = tokio:: task:: spawn_blocking ( move || {
437- let storage = handle. storage . read ( ) . unwrap ( ) ;
438- let res = f ( storage. deref ( ) ) ;
439- drop ( storage) ;
440- ( handle, res)
441- } )
442- . await
443- . expect ( "spawn_blocking failed" ) ;
435+ let storage_guard = handle. storage . read ( ) . await ;
436+ let res = f ( storage_guard. deref ( ) ) ;
437+ drop ( storage_guard) ;
444438 * opt = Some ( handle) ;
445439 res
446440}
@@ -528,7 +522,7 @@ where
528522 pub fn incomplete_mem ( config : Arc < BaoFileConfig > , hash : Hash ) -> Self {
529523 let storage = BaoFileStorage :: incomplete_mem ( ) ;
530524 Self ( Arc :: new ( BaoFileHandleInner {
531- storage : RwLock :: new ( storage) ,
525+ storage : tokio :: sync :: RwLock :: new ( storage) ,
532526 config,
533527 hash,
534528 } ) )
@@ -543,7 +537,7 @@ where
543537 sizes : create_read_write ( & paths. sizes ) ?,
544538 } ) ;
545539 Ok ( Self ( Arc :: new ( BaoFileHandleInner {
546- storage : RwLock :: new ( storage) ,
540+ storage : tokio :: sync :: RwLock :: new ( storage) ,
547541 config,
548542 hash,
549543 } ) ) )
@@ -558,7 +552,7 @@ where
558552 ) -> Self {
559553 let storage = BaoFileStorage :: Complete ( CompleteStorage { data, outboard } ) ;
560554 Self ( Arc :: new ( BaoFileHandleInner {
561- storage : RwLock :: new ( storage) ,
555+ storage : tokio :: sync :: RwLock :: new ( storage) ,
562556 config,
563557 hash,
564558 } ) )
@@ -567,20 +561,20 @@ where
567561 /// Transform the storage in place. If the transform fails, the storage will
568562 /// be an immutable empty storage.
569563 #[ cfg( feature = "fs-store" ) ]
570- pub ( crate ) fn transform (
564+ pub ( crate ) async fn transform (
571565 & self ,
572- f : impl FnOnce ( BaoFileStorage < T > ) -> io:: Result < BaoFileStorage < T > > ,
566+ f : impl AsyncFnOnce ( BaoFileStorage < T > ) -> io:: Result < BaoFileStorage < T > > ,
573567 ) -> io:: Result < ( ) > {
574- let mut lock = self . storage . write ( ) . unwrap ( ) ;
568+ let mut lock = self . storage . write ( ) . await ;
575569 let storage = lock. take ( ) ;
576- * lock = f ( storage) ?;
570+ * lock = f ( storage) . await ?;
577571 Ok ( ( ) )
578572 }
579573
580574 /// True if the file is complete.
581575 pub fn is_complete ( & self ) -> bool {
582576 matches ! (
583- self . storage. read ( ) . unwrap( ) . deref( ) ,
577+ self . storage. try_read ( ) . unwrap( ) . deref( ) ,
584578 BaoFileStorage :: Complete ( _)
585579 )
586580 }
@@ -603,7 +597,7 @@ where
603597
604598 /// The most precise known total size of the data file.
605599 pub fn current_size ( & self ) -> io:: Result < u64 > {
606- match self . storage . read ( ) . unwrap ( ) . deref ( ) {
600+ match self . storage . try_read ( ) . unwrap ( ) . deref ( ) {
607601 BaoFileStorage :: Complete ( mem) => Ok ( mem. data_size ( ) ) ,
608602 BaoFileStorage :: IncompleteMem ( mem) => Ok ( mem. current_size ( ) ) ,
609603 BaoFileStorage :: IncompleteFile ( file) => file. current_size ( ) ,
@@ -633,8 +627,8 @@ where
633627 }
634628
635629 /// This is the synchronous impl for writing a batch.
636- fn write_batch ( & self , size : u64 , batch : & [ BaoContentItem ] ) -> io:: Result < HandleChange > {
637- let mut storage = self . storage . write ( ) . unwrap ( ) ;
630+ async fn write_batch ( & self , size : u64 , batch : & [ BaoContentItem ] ) -> io:: Result < HandleChange > {
631+ let mut storage = self . storage . write ( ) . await ;
638632 match storage. deref_mut ( ) {
639633 BaoFileStorage :: IncompleteMem ( mem) => {
640634 // check if we need to switch to file mode, otherwise write to memory
@@ -730,12 +724,7 @@ where
730724 let Some ( handle) = self . 0 . take ( ) else {
731725 return Err ( io:: Error :: new ( io:: ErrorKind :: Other , "deferred batch busy" ) ) ;
732726 } ;
733- let ( handle, change) = tokio:: task:: spawn_blocking ( move || {
734- let change = handle. write_batch ( size, & batch) ;
735- ( handle, change)
736- } )
737- . await
738- . expect ( "spawn_blocking failed" ) ;
727+ let change = handle. write_batch ( size, & batch) . await ;
739728 match change? {
740729 HandleChange :: None => { }
741730 HandleChange :: MemToFile => {
@@ -752,12 +741,7 @@ where
752741 let Some ( handle) = self . 0 . take ( ) else {
753742 return Err ( io:: Error :: new ( io:: ErrorKind :: Other , "deferred batch busy" ) ) ;
754743 } ;
755- let ( handle, res) = tokio:: task:: spawn_blocking ( move || {
756- let res = handle. storage . write ( ) . unwrap ( ) . sync_all ( ) ;
757- ( handle, res)
758- } )
759- . await
760- . expect ( "spawn_blocking failed" ) ;
744+ let res = handle. storage . write ( ) . await . sync_all ( ) ;
761745 self . 0 = Some ( handle) ;
762746 res
763747 }
0 commit comments