@@ -27,7 +27,7 @@ use ccore::{
2727} ;
2828use cmerkle:: snapshot:: ChunkDecompressor ;
2929use cmerkle:: snapshot:: Restore as SnapshotRestore ;
30- use cmerkle:: TrieFactory ;
30+ use cmerkle:: { skewed_merkle_root , TrieFactory } ;
3131use cnetwork:: { Api , EventSender , NetworkExtension , NodeId } ;
3232use cstate:: FindActionHandler ;
3333use ctimer:: TimerToken ;
@@ -64,7 +64,7 @@ pub struct TokenInfo {
6464enum State {
6565 SnapshotHeader ( BlockHash , u64 ) ,
6666 SnapshotBody {
67- block : BlockHash ,
67+ header : EncodedHeader ,
6868 prev_root : H256 ,
6969 } ,
7070 SnapshotChunk {
@@ -151,7 +151,7 @@ impl Extension {
151151 let parent =
152152 client. block_header ( & parent_hash. into ( ) ) . expect ( "Parent header of the snapshot header must exist" ) ;
153153 return State :: SnapshotBody {
154- block : hash ,
154+ header ,
155155 prev_root : parent. transactions_root ( ) ,
156156 }
157157 }
@@ -414,8 +414,29 @@ impl NetworkExtension<Event> for Extension {
414414 }
415415 }
416416 State :: SnapshotBody {
417+ ref header,
417418 ..
418- } => unimplemented ! ( ) ,
419+ } => {
420+ for id in & peer_ids {
421+ if let Some ( requests) = self . requests . get_mut ( id) {
422+ ctrace ! ( SYNC , "Send snapshot body request to {}" , id) ;
423+ let request = RequestMessage :: Bodies ( vec ! [ header. hash( ) ] ) ;
424+ let request_id = self . last_request ;
425+ self . last_request += 1 ;
426+ requests. push ( ( request_id, request. clone ( ) ) ) ;
427+ self . api . send ( id, Arc :: new ( Message :: Request ( request_id, request) . rlp_bytes ( ) ) ) ;
428+
429+ let token = & self . tokens [ id] ;
430+ let token_info = self . tokens_info . get_mut ( token) . unwrap ( ) ;
431+
432+ let _ = self . api . clear_timer ( * token) ;
433+ self . api
434+ . set_timer_once ( * token, Duration :: from_millis ( SYNC_EXPIRE_REQUEST_INTERVAL ) )
435+ . expect ( "Timer set succeeds" ) ;
436+ token_info. request_id = Some ( request_id) ;
437+ }
438+ }
439+ }
419440 State :: SnapshotChunk {
420441 block,
421442 ref mut restore,
@@ -811,20 +832,11 @@ impl Extension {
811832 match self . state {
812833 State :: SnapshotHeader ( hash, _) => match headers {
813834 [ parent, header] if header. hash ( ) == hash => {
814- match self . client . import_bootstrap_header ( & header) {
815- Ok ( _) | Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
816- self . state = State :: SnapshotBody {
817- block : hash,
818- prev_root : * parent. transactions_root ( ) ,
819- } ;
820- cdebug ! ( SYNC , "Transitioning state to {:?}" , self . state) ;
821- }
822- Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => { }
823- // FIXME: handle import errors
824- Err ( err) => {
825- cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
826- }
827- }
835+ self . state = State :: SnapshotBody {
836+ header : EncodedHeader :: new ( header. rlp_bytes ( ) . to_vec ( ) ) ,
837+ prev_root : * parent. transactions_root ( ) ,
838+ } ;
839+ cdebug ! ( SYNC , "Transitioning state to {:?}" , self . state) ;
828840 }
829841 _ => cdebug ! (
830842 SYNC ,
@@ -883,42 +895,75 @@ impl Extension {
883895
884896 fn on_body_response ( & mut self , hashes : Vec < BlockHash > , bodies : Vec < Vec < UnverifiedTransaction > > ) {
885897 ctrace ! ( SYNC , "Received body response with lenth({}) {:?}" , hashes. len( ) , hashes) ;
886- {
887- self . body_downloader . import_bodies ( hashes , bodies ) ;
888- let completed = self . body_downloader . drain ( ) ;
889- for ( hash , transactions ) in completed {
890- let header = self
891- . client
892- . block_header ( & BlockId :: Hash ( hash ) )
893- . expect ( "Downloaded body's header must exist" )
894- . decode ( ) ;
895- let block = Block {
896- header,
897- transactions,
898- } ;
899- cdebug ! ( SYNC , "Body download completed for #{}({})" , block . header . number ( ) , hash ) ;
900- match self . client . import_block ( block . rlp_bytes ( & Seal :: With ) ) {
901- Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
902- cwarn ! ( SYNC , "Downloaded already existing block({})" , hash)
903- }
904- Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => {
905- cwarn ! ( SYNC , "Downloaded already queued in the verification queue({}) " , hash )
906- }
907- Err ( err ) => {
898+
899+ match self . state {
900+ State :: SnapshotBody {
901+ ref header ,
902+ prev_root ,
903+ } => {
904+ let body = bodies . first ( ) . expect ( "Body response in SnapshotBody state has only one body" ) ;
905+ let new_root = skewed_merkle_root ( prev_root , body. iter ( ) . map ( Encodable :: rlp_bytes ) ) ;
906+ if header . transactions_root ( ) == new_root {
907+ let block = Block {
908+ header : header . decode ( ) ,
909+ transactions : body . clone ( ) ,
910+ } ;
911+ match self . client . import_bootstrap_block ( & block ) {
912+ Ok ( _ ) | Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
913+ self . state = State :: SnapshotChunk {
914+ block : header . hash ( ) ,
915+ restore : SnapshotRestore :: new ( header . state_root ( ) ) ,
916+ } ;
917+ cdebug ! ( SYNC , "Transitioning state to {:?} " , self . state ) ;
918+ }
919+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => { }
908920 // FIXME: handle import errors
909- cwarn ! ( SYNC , "Cannot import block({}): {:?}" , hash, err) ;
910- break
921+ Err ( err) => {
922+ cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
923+ }
911924 }
912- _ => { }
913925 }
914926 }
915- }
927+ State :: Full => {
928+ {
929+ self . body_downloader . import_bodies ( hashes, bodies) ;
930+ let completed = self . body_downloader . drain ( ) ;
931+ for ( hash, transactions) in completed {
932+ let header = self
933+ . client
934+ . block_header ( & BlockId :: Hash ( hash) )
935+ . expect ( "Downloaded body's header must exist" )
936+ . decode ( ) ;
937+ let block = Block {
938+ header,
939+ transactions,
940+ } ;
941+ cdebug ! ( SYNC , "Body download completed for #{}({})" , block. header. number( ) , hash) ;
942+ match self . client . import_block ( block. rlp_bytes ( & Seal :: With ) ) {
943+ Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
944+ cwarn ! ( SYNC , "Downloaded already existing block({})" , hash)
945+ }
946+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => {
947+ cwarn ! ( SYNC , "Downloaded already queued in the verification queue({})" , hash)
948+ }
949+ Err ( err) => {
950+ // FIXME: handle import errors
951+ cwarn ! ( SYNC , "Cannot import block({}): {:?}" , hash, err) ;
952+ break
953+ }
954+ _ => { }
955+ }
956+ }
957+ }
916958
917- let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
918- peer_ids. shuffle ( & mut thread_rng ( ) ) ;
959+ let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
960+ peer_ids. shuffle ( & mut thread_rng ( ) ) ;
919961
920- for id in peer_ids {
921- self . send_body_request ( & id) ;
962+ for id in peer_ids {
963+ self . send_body_request ( & id) ;
964+ }
965+ }
966+ _ => { }
922967 }
923968 }
924969
0 commit comments