Skip to content

Commit 0cf35bd

Browse files
committed
Provide syncer session call back handlers that can be intercepted
1 parent d779031 commit 0cf35bd

File tree

9 files changed

+244
-42
lines changed

9 files changed

+244
-42
lines changed

execution_chain/sync/beacon.nim

Lines changed: 67 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ import
1515
pkg/stew/[interval_set, sorted_set],
1616
../core/chain,
1717
../networking/p2p,
18-
./beacon/worker/headers/headers_target,
1918
./beacon/[beacon_desc, worker],
20-
./beacon/worker/classify,
19+
./beacon/worker/blocks/[blocks_fetch, blocks_import],
20+
./beacon/worker/headers/[headers_fetch, headers_target],
21+
./beacon/worker/[classify, update],
2122
./[sync_sched, wire_protocol]
2223

2324
export
@@ -26,34 +27,64 @@ export
2627
logScope:
2728
topics = "beacon sync"
2829

30+
# ------------------------------------------------------------------------------
31+
# Interceptable handlers
32+
# ------------------------------------------------------------------------------
33+
34+
proc schedDaemonCB(
35+
ctx: BeaconCtxRef;
36+
): Future[Duration]
37+
{.async: (raises: []).} =
38+
return worker.runDaemon(ctx, "Daemon") # async/template
39+
40+
proc schedStartCB(buddy: BeaconBuddyRef): bool =
41+
return worker.start(buddy, "Start")
42+
43+
proc schedStopCB(buddy: BeaconBuddyRef) =
44+
worker.stop(buddy, "Stop")
45+
46+
proc schedPoolCB(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
47+
return worker.runPool(buddy, last, laps, "SyncMode")
48+
49+
proc schedPeerCB(
50+
buddy: BeaconBuddyRef;
51+
rank: PeerRanking;
52+
): Future[Duration]
53+
{.async: (raises: []).} =
54+
return worker.runPeer(buddy, rank, "Peer") # async/template
55+
56+
proc noOpFn(buddy: BeaconBuddyRef) = discard
57+
proc noOpEx(self: BeaconHandlersSyncRef) = discard
58+
2959
# ------------------------------------------------------------------------------
3060
# Virtual methods/interface, `mixin` functions
3161
# ------------------------------------------------------------------------------
3262

3363
proc runSetup(ctx: BeaconCtxRef): bool =
34-
worker.setup(ctx, "Setup")
64+
return worker.setup(ctx, "Setup")
3565

3666
proc runRelease(ctx: BeaconCtxRef) =
3767
worker.release(ctx, "Release")
3868

39-
proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
40-
return worker.runDaemon(ctx, "Daemon")
41-
4269
proc runTicker(ctx: BeaconCtxRef) =
4370
worker.runTicker(ctx, "Ticker")
4471

72+
73+
proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
74+
return await ctx.handler.schedDaemon(ctx)
75+
4576
proc runStart(buddy: BeaconBuddyRef): bool =
46-
worker.start(buddy, "Start")
77+
return buddy.ctx.handler.schedStart(buddy)
4778

4879
proc runStop(buddy: BeaconBuddyRef) =
49-
worker.stop(buddy, "Stop")
80+
buddy.ctx.handler.schedStop(buddy)
5081

5182
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
52-
worker.runPool(buddy, last, laps, "SyncMode")
83+
return buddy.ctx.handler.schedPool(buddy, last, laps)
5384

5485
proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
5586
let rank = buddy.classifyForFetching()
56-
return worker.runPeer(buddy, rank, "Peer")
87+
return await buddy.ctx.handler.schedPeer(buddy, rank)
5788

5889
# ------------------------------------------------------------------------------
5990
# Public functions
@@ -85,6 +116,25 @@ proc config*(
85116
desc.initSync(ethNode, maxPeers)
86117
desc.ctx.pool.chain = chain
87118

119+
# Set up handlers so they can be overlayed
120+
desc.ctx.pool.handlers = BeaconHandlersSyncRef(
121+
version: 0,
122+
activate: updateActivateCB,
123+
suspend: updateSuspendCB,
124+
schedDaemon: schedDaemonCB,
125+
schedStart: schedStartCB,
126+
schedStop: schedStopCB,
127+
schedPool: schedPoolCB,
128+
schedPeer: schedPeerCB,
129+
getBlockHeaders: getBlockHeadersCB,
130+
syncBlockHeaders: noOpFn,
131+
getBlockBodies: getBlockBodiesCB,
132+
syncBlockBodies: noOpFn,
133+
importBlock: importBlockCB,
134+
syncImportBlock: noOpFn,
135+
startSync: noOpEx,
136+
stopSync: noOpEx)
137+
88138
if not desc.lazyConfigHook.isNil:
89139
desc.lazyConfigHook(desc)
90140
desc.lazyConfigHook = nil
@@ -101,10 +151,16 @@ proc configTarget*(desc: BeaconSyncRef; hex: string; isFinal: bool): bool =
101151

102152
proc start*(desc: BeaconSyncRef): bool =
103153
doAssert not desc.ctx.isNil
104-
desc.startSync()
154+
if desc.startSync():
155+
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
156+
w.startSync(w)
157+
return true
158+
# false
105159

106160
proc stop*(desc: BeaconSyncRef) {.async.} =
107161
doAssert not desc.ctx.isNil
162+
let w = BeaconHandlersSyncRef(desc.ctx.pool.handlers)
163+
w.stopSync(w)
108164
await desc.stopSync()
109165

110166
# ------------------------------------------------------------------------------

execution_chain/sync/beacon/beacon_desc.nim

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,10 @@ type
2525
## Instance descriptor, extends scheduler object
2626
lazyConfigHook*: BeaconSyncConfigHook
2727

28+
BeaconHandlersSyncRef* = ref object of BeaconHandlersRef
29+
## Add start/stop helpers to function list. By default, this functiona
30+
## are no-ops.
31+
startSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}
32+
stopSync*: proc(self: BeaconHandlersSyncRef) {.gcsafe, raises: [].}
33+
2834
# End

execution_chain/sync/beacon/worker/blocks/blocks_blocks.nim

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,27 @@ import
1717
../../../../networking/p2p,
1818
../../../wire_protocol/types,
1919
../[update, worker_desc],
20-
./[blocks_fetch, blocks_helpers, blocks_import, blocks_unproc]
20+
./[blocks_fetch, blocks_helpers, blocks_unproc]
2121

2222
# ------------------------------------------------------------------------------
2323
# Private helpers
2424
# ------------------------------------------------------------------------------
2525

26+
template importBlock(
27+
buddy: BeaconBuddyRef;
28+
blk: EthBlock;
29+
effPeerID: Hash;
30+
): Result[Duration,BeaconError] =
31+
## Async/template
32+
##
33+
## Wrapper around `importBlock()` handler
34+
##
35+
let
36+
ctx = buddy.ctx
37+
rc = await ctx.handler.importBlock(buddy, blk, effPeerID)
38+
ctx.handler.syncImportBlock(buddy) # debugging, trace, replay
39+
rc
40+
2641
proc getNthHash(ctx: BeaconCtxRef; blocks: seq[EthBlock]; n: int): Hash32 =
2742
ctx.hdrCache.getHash(blocks[n].header.number).valueOr:
2843
return zeroHash32
@@ -202,7 +217,7 @@ template blocksImport*(
202217

203218
for n in 0 ..< blocks.len:
204219
let nBn = blocks[n].header.number
205-
discard (await buddy.importBlock(blocks[n], peerID)).valueOr:
220+
buddy.importBlock(blocks[n], peerID).isOkOr:
206221
if error.excp != ECancelledError:
207222
isError = true
208223

execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,30 @@ import
1818
../worker_desc,
1919
./blocks_helpers
2020

21+
logScope:
22+
topics = "beacon sync"
23+
24+
# ------------------------------------------------------------------------------
25+
# Private helper
26+
# -----------------------------------------------------------------------------
27+
28+
template getBlockBodies(
29+
buddy: BeaconBuddyRef;
30+
req: BlockBodiesRequest;
31+
): Result[FetchBodiesData,BeaconError] =
32+
## Async/template
33+
##
34+
## Wrapper around `getBlockBodies()` handler
35+
##
36+
let rc = await buddy.ctx.handler.getBlockBodies(buddy, req)
37+
buddy.ctx.handler.syncBlockBodies(buddy) # debugging, sync, replay
38+
rc
39+
2140
# ------------------------------------------------------------------------------
22-
# Private helpers
41+
# Public handler
2342
# -----------------------------------------------------------------------------
2443

25-
proc getBlockBodies(
44+
proc getBlockBodiesCB*(
2645
buddy: BeaconBuddyRef;
2746
req: BlockBodiesRequest;
2847
): Future[Result[FetchBodiesData,BeaconError]]
@@ -75,7 +94,7 @@ template fetchBodies*(
7594
trace trEthSendSendingGetBlockBodies,
7695
peer, nReq, nErrors=buddy.nErrors.fetch.bdy
7796

78-
let rc = await buddy.getBlockBodies(request)
97+
let rc = buddy.getBlockBodies(request)
7998
var elapsed: Duration
8099
if rc.isOk:
81100
elapsed = rc.value.elapsed

execution_chain/sync/beacon/worker/blocks/blocks_import.nim

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@ import
1616
../../../wire_protocol,
1717
../worker_desc
1818

19+
logScope:
20+
topics = "beacon sync"
21+
1922
# ------------------------------------------------------------------------------
20-
# Public function
23+
# Public handler
2124
# ------------------------------------------------------------------------------
2225

23-
proc importBlock*(
26+
proc importBlockCB*(
2427
buddy: BeaconBuddyRef;
2528
blk: EthBlock;
2629
effPeerID: Hash;

execution_chain/sync/beacon/worker/headers/headers_fetch.nim

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,31 @@ import
1818
../worker_desc,
1919
./headers_helpers
2020

21+
logScope:
22+
topics = "beacon sync"
23+
2124
# ------------------------------------------------------------------------------
2225
# Private helpers
26+
# -----------------------------------------------------------------------------
27+
28+
template getBlockHeaders(
29+
buddy: BeaconBuddyRef;
30+
req: BlockHeadersRequest;
31+
bn: BlockNumber;
32+
): Result[FetchHeadersData,BeaconError] =
33+
## Async/template
34+
##
35+
## Wrapper around `getBlockHeaders()` handler
36+
##
37+
let rc = await buddy.ctx.handler.getBlockHeaders(buddy, req, bn)
38+
buddy.ctx.handler.syncBlockHeaders(buddy) # debugging, sync, replay
39+
rc
40+
41+
# ------------------------------------------------------------------------------
42+
# Public handler
2343
# ------------------------------------------------------------------------------
2444

25-
proc getBlockHeaders(
45+
proc getBlockHeadersCB*(
2646
buddy: BeaconBuddyRef;
2747
req: BlockHeadersRequest;
2848
bn: BlockNumber;
@@ -94,7 +114,7 @@ template fetchHeadersReversed*(
94114
trace trEthSendSendingGetBlockHeaders & " reverse", peer, req=ivReq,
95115
nReq=req.maxResults, hash=topHash.toStr, nErrors=buddy.nErrors.fetch.hdr
96116

97-
let rc = await buddy.getBlockHeaders(req, BlockNumber ivReq.maxPt)
117+
let rc = buddy.getBlockHeaders(req, BlockNumber ivReq.maxPt)
98118
var elapsed: Duration
99119
if rc.isOk:
100120
elapsed = rc.value.elapsed

execution_chain/sync/beacon/worker/start_stop.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import
1414
pkg/[chronicles, chronos, eth/common, metrics],
1515
../../../networking/p2p,
1616
../../wire_protocol,
17-
./[blocks, headers, update, worker_desc]
17+
./[blocks, headers, worker_desc]
1818

1919
type
2020
SyncStateData = tuple
@@ -59,8 +59,8 @@ proc setupServices*(ctx: BeaconCtxRef; info: static[string]) =
5959

6060
# Set up the notifier informing when a new syncer session has started.
6161
ctx.hdrCache.start proc() =
62-
# Activates the syncer. Work will be picked up by peers when available.
63-
ctx.updateActivateSyncer()
62+
# This directive captures `ctx` for calling the activation handler.
63+
ctx.handler.activate(ctx)
6464

6565
# Provide progress info call back handler
6666
ctx.pool.chain.com.beaconSyncerProgress = proc(): SyncStateData =

execution_chain/sync/beacon/worker/update.nim

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,6 @@ declareGauge nec_sync_head, "" &
3030
# Private functions, state handler helpers
3131
# ------------------------------------------------------------------------------
3232

33-
proc updateSuspendSyncer(ctx: BeaconCtxRef) =
34-
## Clean up sync target buckets, stop syncer activity, and and get ready
35-
## for awaiting a new request from the `CL`.
36-
##
37-
ctx.hdrCache.clear()
38-
39-
ctx.pool.failedPeers.clear()
40-
ctx.pool.seenData = false
41-
42-
ctx.hibernate = true
43-
44-
metrics.set(nec_sync_last_block_imported, 0)
45-
metrics.set(nec_sync_head, 0)
46-
47-
info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
48-
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies
49-
5033
proc commitCollectHeaders(ctx: BeaconCtxRef; info: static[string]): bool =
5134
## Link header chain into `FC` module. Gets ready for block import.
5235
##
@@ -227,7 +210,7 @@ proc updateSyncState*(ctx: BeaconCtxRef; info: static[string]) =
227210

228211
# Final sync scrum layout reached or inconsistent/impossible state
229212
if newState == idle:
230-
ctx.updateSuspendSyncer()
213+
ctx.handler.suspend(ctx)
231214

232215

233216
proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
@@ -238,7 +221,7 @@ proc updateLastBlockImported*(ctx: BeaconCtxRef; bn: BlockNumber) =
238221
# Public functions, call-back handler ready
239222
# ------------------------------------------------------------------------------
240223

241-
proc updateActivateSyncer*(ctx: BeaconCtxRef) =
224+
proc updateActivateCB*(ctx: BeaconCtxRef) =
242225
## If in hibernate mode, accept a cache session and activate syncer
243226
##
244227
if ctx.hibernate and # only in idle mode
@@ -278,6 +261,24 @@ proc updateActivateSyncer*(ctx: BeaconCtxRef) =
278261
# Failed somewhere on the way
279262
ctx.hdrCache.clear()
280263

264+
265+
proc updateSuspendCB*(ctx: BeaconCtxRef) =
266+
## Clean up sync target buckets, stop syncer activity, and and get ready
267+
## for a new sync request from the `CL`.
268+
##
269+
ctx.hdrCache.clear()
270+
271+
ctx.pool.failedPeers.clear()
272+
ctx.pool.seenData = false
273+
274+
ctx.hibernate = true
275+
276+
metrics.set(nec_sync_last_block_imported, 0)
277+
metrics.set(nec_sync_head, 0)
278+
279+
info "Suspending syncer", base=ctx.chain.baseNumber.bnStr,
280+
head=ctx.chain.latestNumber.bnStr, nSyncPeers=ctx.pool.nBuddies
281+
281282
# ------------------------------------------------------------------------------
282283
# End
283284
# ------------------------------------------------------------------------------

0 commit comments

Comments
 (0)