88use  crate :: connection_manager:: { 
99    ConnMgr ,  ConnMgrStatus ,  ConnToMainMsg ,  ConnToMainMsgInner , 
1010} ; 
11+ use  crate :: ledgers:: PersistentStateLedger ; 
12+ use  camino:: Utf8PathBuf ; 
1113use  omicron_uuid_kinds:: RackUuid ; 
1214use  serde:: { Deserialize ,  Serialize } ; 
1315use  slog:: { Logger ,  debug,  error,  info,  o} ; 
@@ -48,8 +50,8 @@ const CONN_TO_MAIN_CHANNEL_BOUND: usize = 1024;
4850pub  struct  Config  { 
4951    pub  baseboard_id :  BaseboardId , 
5052    pub  listen_addr :  SocketAddrV6 , 
51-     //     pub tq_state_ledger_paths : Vec<Utf8PathBuf>,
52-     //   pub network_config_ledger_paths: Vec<Utf8PathBuf>,
53+     pub  tq_ledger_paths :  Vec < Utf8PathBuf > , 
54+     pub  network_config_ledger_paths :  Vec < Utf8PathBuf > , 
5355    pub  sprockets :  SprocketsConfig , 
5456} 
5557
@@ -340,8 +342,8 @@ impl NodeTaskHandle {
340342pub  struct  NodeTask  { 
341343    shutdown :  bool , 
342344    log :  Logger , 
343-     #[ expect( unused) ]  
344345    config :  Config , 
346+     tq_ledger_generation :  u64 , 
345347    node :  Node , 
346348    ctx :  NodeCtx , 
347349    conn_mgr :  ConnMgr , 
@@ -368,8 +370,20 @@ impl NodeTask {
368370
369371        let  baseboard_id = config. baseboard_id . clone ( ) ; 
370372
371-         // TODO: Load persistent state from ledger 
372-         let  mut  ctx = NodeCtx :: new ( config. baseboard_id . clone ( ) ) ; 
373+         let  ( mut  ctx,  tq_ledger_generation)  = if  let  Some ( ps_ledger)  =
374+             PersistentStateLedger :: load ( & log,  config. tq_ledger_paths . clone ( ) ) 
375+                 . await 
376+         { 
377+             ( 
378+                 NodeCtx :: new_with_persistent_state ( 
379+                     config. baseboard_id . clone ( ) , 
380+                     ps_ledger. state , 
381+                 ) , 
382+                 ps_ledger. generation , 
383+             ) 
384+         }  else  { 
385+             ( NodeCtx :: new ( config. baseboard_id . clone ( ) ) ,  0 ) 
386+         } ; 
373387        let  node = Node :: new ( & log,  & mut  ctx) ; 
374388        let  conn_mgr = ConnMgr :: new ( 
375389            & log, 
@@ -384,6 +398,7 @@ impl NodeTask {
384398                shutdown :  false , 
385399                log, 
386400                config, 
401+                 tq_ledger_generation, 
387402                node, 
388403                ctx, 
389404                conn_mgr, 
@@ -423,6 +438,10 @@ impl NodeTask {
423438    } 
424439
425440    // Handle messages from connection management tasks 
441+     // 
442+     // We persist state at the end of this method, which always occurs before 
443+     // we send any outgoing messages in the `run` loop as a response of handling 
444+     // this message. 
426445    async  fn  on_conn_msg ( & mut  self ,  msg :  ConnToMainMsg )  { 
427446        let  task_id = msg. task_id ; 
428447        match  msg. msg  { 
@@ -452,9 +471,14 @@ impl NodeTask {
452471                todo ! ( ) ; 
453472            } 
454473        } 
474+         self . save_persistent_state ( ) . await ; 
455475    } 
456476
457-     // TODO: Process `ctx`: save persistent state 
477+     // Handle API requests from sled-agent 
478+     // 
479+     // NOTE: We persist state where necessary before responding to clients. Any 
480+     // resulting output messages will also be sent in the `run` loop after we 
481+     // persist state. 
458482    async  fn  on_api_request ( & mut  self ,  request :  NodeApiRequest )  { 
459483        match  request { 
460484            NodeApiRequest :: BootstrapAddresses ( addrs)  => { 
@@ -479,6 +503,7 @@ impl NodeTask {
479503                    . map ( |_| { 
480504                        self . ctx . persistent_state ( ) . commits . contains ( & epoch) 
481505                    } ) ; 
506+                 self . save_persistent_state ( ) . await ; 
482507                let  _ = responder. send ( res) ; 
483508            } 
484509            NodeApiRequest :: ConnMgrStatus  {  responder }  => { 
@@ -501,6 +526,7 @@ impl NodeTask {
501526            NodeApiRequest :: LrtqUpgrade  {  msg,  responder }  => { 
502527                let  res =
503528                    self . node . coordinate_upgrade_from_lrtq ( & mut  self . ctx ,  msg) ; 
529+                 self . save_persistent_state ( ) . await ; 
504530                let  _ = responder. send ( res) ; 
505531            } 
506532            NodeApiRequest :: NodeStatus  {  responder }  => { 
@@ -518,11 +544,13 @@ impl NodeTask {
518544                    . map ( |_| { 
519545                        self . ctx . persistent_state ( ) . commits . contains ( & epoch) 
520546                    } ) ; 
547+                 self . save_persistent_state ( ) . await ; 
521548                let  _ = responder. send ( res) ; 
522549            } 
523550            NodeApiRequest :: Reconfigure  {  msg,  responder }  => { 
524551                let  res =
525552                    self . node . coordinate_reconfiguration ( & mut  self . ctx ,  msg) ; 
553+                 self . save_persistent_state ( ) . await ; 
526554                let  _ = responder. send ( res) ; 
527555            } 
528556            NodeApiRequest :: Shutdown  => { 
@@ -531,6 +559,19 @@ impl NodeTask {
531559            } 
532560        } 
533561    } 
562+ 
563+     /// Save `PersistentState` to storage if necessary 
564+ pub  async  fn  save_persistent_state ( & mut  self )  { 
565+         if  self . ctx . persistent_state_change_check_and_reset ( )  { 
566+             self . tq_ledger_generation  = PersistentStateLedger :: save ( 
567+                 & self . log , 
568+                 self . config . tq_ledger_paths . clone ( ) , 
569+                 self . tq_ledger_generation , 
570+                 self . ctx . persistent_state ( ) . clone ( ) , 
571+             ) 
572+             . await ; 
573+         } 
574+     } 
534575} 
535576
536577#[ cfg( test) ]  
@@ -587,7 +628,15 @@ mod tests {
587628                    } , 
588629                    roots :  vec ! [ cert_path( dir. clone( ) ,  & root_prefix( ) ) ] , 
589630                } ; 
590-                 Config  {  baseboard_id,  listen_addr,  sprockets } 
631+                 let  tq_ledger_paths =
632+                     vec ! [ dir. join( format!( "test-tq-ledger-[{i}]" ) ) ] ; 
633+                 Config  { 
634+                     baseboard_id, 
635+                     listen_addr, 
636+                     sprockets, 
637+                     tq_ledger_paths, 
638+                     network_config_ledger_paths :  vec ! [ ] , 
639+                 } 
591640            } ) 
592641            . collect ( ) 
593642    } 
@@ -1435,4 +1484,141 @@ mod tests {
14351484
14361485        setup. cleanup_successful ( ) ; 
14371486    } 
1487+ 
1488+     /// Ensure state is persisted as we expect 
1489+ #[ tokio:: test]  
1490+     pub  async  fn  tq_persistent_state ( )  { 
1491+         let  num_nodes = 4 ; 
1492+         let  mut  setup =
1493+             TestSetup :: spawn_nodes ( "tq_initial_config" ,  num_nodes) . await ; 
1494+         let  rack_id = RackUuid :: new_v4 ( ) ; 
1495+ 
1496+         // Trigger an initial configuration by using the first node as a 
1497+         // coordinator. We're pretending to be the sled-agent with instruction from 
1498+         // Nexus here. 
1499+         let  initial_config = ReconfigureMsg  { 
1500+             rack_id, 
1501+             epoch :  Epoch ( 1 ) , 
1502+             last_committed_epoch :  None , 
1503+             members :  setup. members ( ) . cloned ( ) . collect ( ) , 
1504+             threshold :  trust_quorum_protocol:: Threshold ( 3 ) , 
1505+         } ; 
1506+ 
1507+         // Tell nodes how to reach each other 
1508+         for  h in  & setup. node_handles  { 
1509+             h. load_peer_addresses ( setup. listen_addrs . iter ( ) . cloned ( ) . collect ( ) ) 
1510+                 . await 
1511+                 . unwrap ( ) ; 
1512+         } 
1513+ 
1514+         let  coordinator = setup. node_handles . first ( ) . unwrap ( ) ; 
1515+         coordinator. reconfigure ( initial_config) . await . unwrap ( ) ; 
1516+ 
1517+         let  poll_interval = Duration :: from_millis ( 10 ) ; 
1518+         let  poll_max = Duration :: from_secs ( 10 ) ; 
1519+ 
1520+         // Wait for the coordinator to see `PrepareAck`s from all nodes 
1521+         wait_for_condition ( 
1522+             async  || { 
1523+                 let  Ok ( Some ( s) )  = coordinator. coordinator_status ( ) . await  else  { 
1524+                     return  Err ( CondCheckError :: < ( ) > :: NotYet ) ; 
1525+                 } ; 
1526+                 if  s. acked_prepares . len ( )  == num_nodes { 
1527+                     Ok ( ( ) ) 
1528+                 }  else  { 
1529+                     Err ( CondCheckError :: < ( ) > :: NotYet ) 
1530+                 } 
1531+             } , 
1532+             & poll_interval, 
1533+             & poll_max, 
1534+         ) 
1535+         . await 
1536+         . unwrap ( ) ; 
1537+ 
1538+         // Simulate a crash of the last node. 
1539+         let  join_handle = setup. join_handles . pop ( ) . unwrap ( ) ; 
1540+         let  node_handle = setup. node_handles . pop ( ) . unwrap ( ) ; 
1541+         node_handle. shutdown ( ) . await . unwrap ( ) ; 
1542+         join_handle. await . unwrap ( ) ; 
1543+         let  _ = setup. listen_addrs . pop ( ) . unwrap ( ) ; 
1544+ 
1545+         // Now Bring it back up with the same persistent state, which contains 
1546+         // the initial config and prepare. Commit should work and everything 
1547+         // should pick up as expected. 
1548+         let  ( mut  task,  handle)  = NodeTask :: new ( 
1549+             setup. configs . last ( ) . unwrap ( ) . clone ( ) , 
1550+             & setup. logctx . log , 
1551+         ) 
1552+         . await ; 
1553+         let  listen_addr = handle. listen_addr ( ) ; 
1554+         setup. node_handles . push ( handle) ; 
1555+         setup. join_handles . push ( tokio:: spawn ( async  move  {  task. run ( ) . await  } ) ) ; 
1556+         setup. listen_addrs . push ( listen_addr) ; 
1557+ 
1558+         // Tell nodes how to reach each other 
1559+         for  h in  & setup. node_handles  { 
1560+             h. load_peer_addresses ( setup. listen_addrs . iter ( ) . cloned ( ) . collect ( ) ) 
1561+                 . await 
1562+                 . unwrap ( ) ; 
1563+         } 
1564+ 
1565+         // Commit at each node 
1566+         // 
1567+         // Nexus retries this idempotent command until each node acks. So we 
1568+         // simulate that here. 
1569+         wait_for_condition ( 
1570+             async  || { 
1571+                 let  mut  acked = 0 ; 
1572+                 for  h in  & setup. node_handles  { 
1573+                     if  h. commit ( rack_id,  Epoch ( 1 ) ) . await . unwrap ( )  { 
1574+                         acked += 1 ; 
1575+                     } 
1576+                 } 
1577+                 if  acked == num_nodes { 
1578+                     Ok ( ( ) ) 
1579+                 }  else  { 
1580+                     Err ( CondCheckError :: < ( ) > :: NotYet ) 
1581+                 } 
1582+             } , 
1583+             & poll_interval, 
1584+             & poll_max, 
1585+         ) 
1586+         . await 
1587+         . unwrap ( ) ; 
1588+ 
1589+         // Simulate crash and restart again 
1590+         let  join_handle = setup. join_handles . pop ( ) . unwrap ( ) ; 
1591+         let  node_handle = setup. node_handles . pop ( ) . unwrap ( ) ; 
1592+         node_handle. shutdown ( ) . await . unwrap ( ) ; 
1593+         join_handle. await . unwrap ( ) ; 
1594+         let  _ = setup. listen_addrs . pop ( ) . unwrap ( ) ; 
1595+         let  ( mut  task,  handle)  = NodeTask :: new ( 
1596+             setup. configs . last ( ) . unwrap ( ) . clone ( ) , 
1597+             & setup. logctx . log , 
1598+         ) 
1599+         . await ; 
1600+         let  listen_addr = handle. listen_addr ( ) ; 
1601+         setup. node_handles . push ( handle) ; 
1602+         setup. join_handles . push ( tokio:: spawn ( async  move  {  task. run ( ) . await  } ) ) ; 
1603+         setup. listen_addrs . push ( listen_addr) ; 
1604+ 
1605+         // Tell nodes how to reach each other 
1606+         for  h in  & setup. node_handles  { 
1607+             h. load_peer_addresses ( setup. listen_addrs . iter ( ) . cloned ( ) . collect ( ) ) 
1608+                 . await 
1609+                 . unwrap ( ) ; 
1610+         } 
1611+ 
1612+         // Now load the rack secret at all nodes 
1613+         let  mut  secret = None ; 
1614+         for  h in  & setup. node_handles  { 
1615+             let  rs = h. load_rack_secret ( Epoch ( 1 ) ) . await . unwrap ( ) ; 
1616+             if  secret. is_none ( )  { 
1617+                 secret = Some ( rs. clone ( ) ) ; 
1618+             } 
1619+             assert_eq ! ( & rs,  secret. as_ref( ) . unwrap( ) ) ; 
1620+         } 
1621+ 
1622+         setup. cleanup_successful ( ) ; 
1623+     } 
14381624} 
0 commit comments