Skip to content

Commit d779031

Browse files
authored
Beacon sync maint update (#3789)
* Move peer ranking/assessment to the `beacon` scheduler/wrapper why So the `worker` logic can receive that assessment via function argument. This reduces complexity for the tracer/replay logic. * Reduce trace-log spamming when waiting for peers for manual target details Currently, the waiting for peers feature can be activated only for the trace wrapper binary. * Re-calibrate waiting/re-activation time for low ranking peers why In some cases there is no expectation that a peer might be ready for immediate action. * Not repeating some time-consuming failed fetch request again details With long fetch timeout, the request is stashed and checked before sending. It will be rejected right away (without sending.) * Code cosmetics, log & assert messages updates
1 parent c030b05 commit d779031

File tree

14 files changed

+165
-93
lines changed

14 files changed

+165
-93
lines changed

execution_chain/core/chain/header_chain_cache.nim

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ const
127127
## Insert `persist()` statements in bulk action every `MaxDeleteBatch`
128128
## `del()` directives.
129129

130-
RaisePfx = "Header Cache: "
131-
## Message prefix used when bailing out raising an exception
130+
MsgPfx = "Header Cache: "
131+
## Message prefix used when logging or raising an exception
132132

133133
const
134134
HccDbInfoKey = 0.beaconHeaderKey
@@ -163,7 +163,7 @@ func decodePayload(data: seq[byte]; T: type): T =
163163
try:
164164
result = rlp.decode(data, T)
165165
except RlpError as e:
166-
raiseAssert RaisePfx & "rlp.decode(" & $T & ") failed:" &
166+
raiseAssert MsgPfx & "rlp.decode(" & $T & ") failed:" &
167167
" name=" & $e.name & " error=" & e.msg
168168

169169
# ------------------------------------------------------------------------------
@@ -172,7 +172,7 @@ func decodePayload(data: seq[byte]; T: type): T =
172172

173173
proc putInfo(db: KvtTxRef; state: HccDbInfo) =
174174
db.put(HccDbInfoKey.toOpenArray, encodePayload(state)).isOkOr:
175-
raiseAssert RaisePfx & "put(info) failed: " & $error
175+
raiseAssert MsgPfx & "put(info) failed: " & $error
176176

177177
proc getInfo(db: KvtTxRef): Opt[HccDbInfo] =
178178
let data = db.get(HccDbInfoKey.toOpenArray).valueOr:
@@ -194,11 +194,11 @@ proc putHeader(db: KvtTxRef; h: Header) =
194194
## of the parent header.
195195
let data = encodePayload(h)
196196
db.put(beaconHeaderKey(h.number).toOpenArray, data).isOkOr:
197-
raiseAssert RaisePfx & "put(header) failed: " & $error
197+
raiseAssert MsgPfx & "put(header) failed: " & $error
198198

199199
let parNumData = (h.number-1).toBytesBE
200200
db.put(genericHashKey(h.parentHash).toOpenArray, parNumData).isOkOr:
201-
raiseAssert RaisePfx & "put(number-1) failed: " & $error
201+
raiseAssert MsgPfx & "put(number-1) failed: " & $error
202202

203203

204204
proc getNumber(db: KvtTxRef, hash: Hash32): Opt[BlockNumber] =
@@ -300,7 +300,7 @@ proc resolveFinHash(hc: HeaderChainRef, f: Hash32) =
300300
return
301301

302302
if hc.chain.tryUpdatePendingFCU(f, number):
303-
debug "PendingFCU resolved to block number",
303+
debug MsgPfx & "pendingFCU resolved to block number",
304304
hash = f.short,
305305
number = number.bnStr
306306

@@ -480,7 +480,7 @@ proc put*(
480480
if rev.len == 0:
481481
return ok() # nothing to do
482482

483-
debug "HC updated",
483+
debug MsgPfx & "updated",
484484
minNum=rev[^1].bnStr,
485485
maxNum=rev[0].bnStr,
486486
numHeaders=rev.len
@@ -598,8 +598,11 @@ proc commit*(hc: HeaderChainRef): Result[void,string] =
598598
return ok()
599599

600600
# Impossible situation!
601-
raiseAssert RaisePfx &
602-
"Missing finalised " & fin.bnStr & " parent on FC module"
601+
raiseAssert MsgPfx &
602+
"Missing finalised " & fin.bnStr & " parent on FC module" &
603+
", base=" & hc.chain.baseNumber.bnStr &
604+
", head=" & hc.session.head.bnStr &
605+
", finalized=" & hc.chain.latestFinalizedBlockNumber.bnStr
603606

604607
hc.session.mode = orphan
605608
err("Parent on FC module has been lost: obsolete branch segment")

execution_chain/sync/beacon.nim

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import
1717
../networking/p2p,
1818
./beacon/worker/headers/headers_target,
1919
./beacon/[beacon_desc, worker],
20+
./beacon/worker/classify,
2021
./[sync_sched, wire_protocol]
2122

2223
export
@@ -30,28 +31,29 @@ logScope:
3031
# ------------------------------------------------------------------------------
3132

3233
proc runSetup(ctx: BeaconCtxRef): bool =
33-
worker.setup(ctx, "RunSetup")
34+
worker.setup(ctx, "Setup")
3435

3536
proc runRelease(ctx: BeaconCtxRef) =
36-
worker.release(ctx, "RunRelease")
37+
worker.release(ctx, "Release")
3738

3839
proc runDaemon(ctx: BeaconCtxRef): Future[Duration] {.async: (raises: []).} =
39-
return worker.runDaemon(ctx, "RunDaemon")
40+
return worker.runDaemon(ctx, "Daemon")
4041

4142
proc runTicker(ctx: BeaconCtxRef) =
42-
worker.runTicker(ctx, "RunTicker")
43+
worker.runTicker(ctx, "Ticker")
4344

4445
proc runStart(buddy: BeaconBuddyRef): bool =
45-
worker.start(buddy, "RunStart")
46+
worker.start(buddy, "Start")
4647

4748
proc runStop(buddy: BeaconBuddyRef) =
48-
worker.stop(buddy, "RunStop")
49+
worker.stop(buddy, "Stop")
4950

5051
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
51-
worker.runPool(buddy, last, laps, "RunPool")
52+
worker.runPool(buddy, last, laps, "SyncMode")
5253

5354
proc runPeer(buddy: BeaconBuddyRef): Future[Duration] {.async: (raises: []).} =
54-
return worker.runPeer(buddy, "RunPeer")
55+
let rank = buddy.classifyForFetching()
56+
return worker.runPeer(buddy, rank, "Peer")
5557

5658
# ------------------------------------------------------------------------------
5759
# Public functions

execution_chain/sync/beacon/worker.nim

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
5555
proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
5656
## Clean up this peer
5757
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
58-
throughput=buddy.only.thruPutStats.toMeanVar.psStr,
58+
thPut=buddy.only.thPutStats.toMeanVar.psStr,
5959
nSyncPeers=(buddy.ctx.pool.nBuddies-1), state=($buddy.syncState)
6060
buddy.stopBuddy()
6161

@@ -76,7 +76,7 @@ proc runTicker*(ctx: BeaconCtxRef; info: static[string]) =
7676
if ctx.pool.lastNoPeersLog + noPeersLogWaitInterval < now:
7777
ctx.pool.lastNoPeersLog = now
7878
debug info & ": no sync peers yet",
79-
elapsed=(now - ctx.pool.lastPeerSeen).toStr,
79+
ela=(now - ctx.pool.lastPeerSeen).toStr,
8080
nOtherPeers=ctx.node.peerPool.connectedNodes.len
8181

8282

@@ -129,7 +129,11 @@ proc runPool*(
129129
true # stop
130130

131131

132-
template runPeer*(buddy: BeaconBuddyRef; info: static[string]): Duration =
132+
template runPeer*(
133+
buddy: BeaconBuddyRef;
134+
rank: PeerRanking;
135+
info: static[string];
136+
): Duration =
133137
## Async/template
134138
##
135139
## This peer worker method is repeatedly invoked (exactly one per peer) while
@@ -141,16 +145,18 @@ template runPeer*(buddy: BeaconBuddyRef; info: static[string]): Duration =
141145
block body:
142146
if buddy.somethingToCollectOrUnstage():
143147

144-
# Classify sync peer (aka buddy) performance
145-
let (fetchPerf {.inject.}, rank) = buddy.classifyForFetching()
146-
147148
trace info & ": start processing", peer=buddy.peer,
148-
throughput=buddy.only.thruPutStats.toMeanVar.psStr,
149-
fetchPerf, rank=(if rank < 0: "n/a" else: $rank),
149+
thPut=buddy.only.thPutStats.toMeanVar.psStr,
150+
rankInfo=($rank.assessed),
151+
rank=(if rank.ranking < 0: "n/a" else: $rank.ranking),
150152
nSyncPeers=buddy.ctx.pool.nBuddies, state=($buddy.syncState)
151153

152-
if fetchPerf == rankingTooLow:
153-
bodyRc = workerIdleWaitInterval
154+
if rank.assessed == rankingTooLow:
155+
# Tell the scheduler to wait a bit longer before next invocation.
156+
# The reasoning is that in case of a low rank labelling, all slots
157+
# for peers downloading can be filled with higher ranking peers. And
158+
# this situation would not change immediately.
159+
bodyRc = workerIdleLongWaitInterval
154160
break body # done, exit
155161

156162
# Download and process headers and blocks
@@ -191,9 +197,11 @@ template runPeer*(buddy: BeaconBuddyRef; info: static[string]): Duration =
191197

192198
# End block: `actionLoop`
193199

194-
else:
200+
elif buddy.ctx.pool.lastState == SyncState.idle:
195201
# Potentially a manual sync target set up
196-
buddy.headersTargetActivate info
202+
if not buddy.headersTargetActivate info:
203+
bodyRc = workerIdleLongWaitInterval
204+
break body
197205

198206
# Idle sleep unless there is something to do
199207
if not buddy.somethingToCollectOrUnstage():

execution_chain/sync/beacon/worker/blocks.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ template blocksCollect*(
125125
head=ctx.chain.latestNumber.bnStr,
126126
target=ctx.subState.head.bnStr,
127127
targetHash=ctx.subState.headHash.short,
128-
throughput=buddy.blkThroughput,
128+
thPut=buddy.blkThroughput,
129129
nSyncPeers=ctx.pool.nBuddies
130130
ctx.pool.lastSyncUpdLog = Moment.now()
131131
nImported = 0
@@ -177,7 +177,7 @@ template blocksCollect*(
177177
head=ctx.chain.latestNumber.bnStr,
178178
target=ctx.subState.head.bnStr,
179179
targetHash=ctx.subState.headHash.short,
180-
throughput=buddy.blkThroughput,
180+
thPut=buddy.blkThroughput,
181181
nSyncPeers=ctx.pool.nBuddies
182182
ctx.pool.lastSyncUpdLog = Moment.now()
183183

execution_chain/sync/beacon/worker/blocks/blocks_fetch.nim

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@ proc getBlockBodies(
2929
{.async: (raises: []).} =
3030
## Wrapper around `getBlockHeaders()`
3131
let start = Moment.now()
32-
var resp: BlockBodiesPacket
3332

33+
if buddy.only.failedReq.state == SyncState.blocks and
34+
buddy.only.failedReq.blockHash == req.blockHashes[0]:
35+
return err((EAlreadyTriedAndFailed,"","",Moment.now()-start))
36+
37+
var resp: BlockBodiesPacket
3438
try:
3539
resp = (await buddy.peer.getBlockBodies(
3640
req, fetchBodiesRlpxTimeout)).valueOr:
@@ -86,17 +90,21 @@ template fetchBodies*(
8690
buddy.ctrl.zombie = true
8791
of ECatchableError:
8892
buddy.bdyFetchRegisterError()
93+
discard buddy.only.thPutStats.hdr.bpsSample(elapsed, 0)
94+
of EAlreadyTriedAndFailed:
95+
# Just return `failed` (no error count or throughput stats)
96+
discard
8997

9098
chronicles.info trEthRecvReceivedBlockBodies & " error", peer, nReq,
91-
elapsed=rc.error.elapsed.toStr, state=($buddy.syncState),
99+
ela=rc.error.elapsed.toStr, state=($buddy.syncState),
92100
error=rc.error.name, msg=rc.error.msg, nErrors=buddy.nErrors.fetch.bdy
93101
break body # return err()
94102

95103
# Evaluate result
96104
if rc.isErr or buddy.ctrl.stopped:
97105
buddy.bdyFetchRegisterError()
98106
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=0,
99-
elapsed=elapsed.toStr, state=($buddy.syncState),
107+
ela=elapsed.toStr, state=($buddy.syncState),
100108
nErrors=buddy.nErrors.fetch.bdy
101109
break body # return err()
102110

@@ -107,17 +115,29 @@ template fetchBodies*(
107115
# Bogus peer returning additional rubbish
108116
buddy.bdyFetchRegisterError(forceZombie=true)
109117
else:
110-
# Data not avail but fast enough answer: degrade througput stats only
111-
discard buddy.only.thruPutStats.blk.bpsSample(elapsed, 0)
118+
# No data available. For a fast enough rejection response, the
119+
# througput stats are degraded, only.
120+
discard buddy.only.thPutStats.blk.bpsSample(elapsed, 0)
121+
122+
# Slow response, definitely not fast enough
112123
if fetchBodiesErrTimeout <= elapsed:
113124
buddy.bdyFetchRegisterError(slowPeer=true)
125+
126+
# Do not repeat the same time-consuming failed request
127+
buddy.only.failedReq = BuddyFirstFetchReq(
128+
state: SyncState.blocks,
129+
blockHash: request.blockHashes[0])
130+
114131
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
115-
elapsed=elapsed.toStr, state=($buddy.syncState),
132+
ela=elapsed.toStr, state=($buddy.syncState),
116133
nErrors=buddy.nErrors.fetch.bdy
117134
break body # return err()
118135

119136
# Update download statistics
120-
let bps = buddy.only.thruPutStats.blk.bpsSample(elapsed, b.getEncodedLength)
137+
let bps = buddy.only.thPutStats.blk.bpsSample(elapsed, b.getEncodedLength)
138+
139+
# Request did not fail
140+
buddy.only.failedReq.reset
121141

122142
# Ban an overly slow peer for a while when observed consecutively.
123143
if fetchBodiesErrTimeout < elapsed:
@@ -127,7 +147,7 @@ template fetchBodies*(
127147
buddy.ctx.pool.lastSlowPeer = Opt.none(Hash) # not last one or not error
128148

129149
trace trEthRecvReceivedBlockBodies, peer, nReq, nResp=b.len,
130-
elapsed=elapsed.toStr, throughput=(bps.toIECb(1) & "ps"),
150+
ela=elapsed.toStr, thPut=(bps.toIECb(1) & "ps"),
131151
state=($buddy.syncState), nErrors=buddy.nErrors.fetch.bdy
132152

133153
bodyRc = Opt[seq[BlockBody]].ok(b)

execution_chain/sync/beacon/worker/blocks/blocks_helpers.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func blkSessionStopped*(ctx: BeaconCtxRef): bool =
4545

4646
func blkThroughput*(buddy: BeaconBuddyRef): string =
4747
## Print throuhput sratistics
48-
buddy.only.thruPutStats.blk.toMeanVar.psStr
48+
buddy.only.thPutStats.blk.toMeanVar.psStr
4949

5050
# ------------------------------------------------------------------------------
5151
# End

execution_chain/sync/beacon/worker/classify.nim

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@ func somethingToCollectOrUnstage*(buddy: BeaconBuddyRef): bool =
2828
false
2929

3030

31-
func classifyForFetching*(
32-
buddy: BeaconBuddyRef;
33-
): tuple[info: DownloadPerformance, ranking: int] =
31+
func classifyForFetching*(buddy: BeaconBuddyRef): PeerRanking =
3432
## Rank and classify peers by whether they should be used for fetching
3533
## data.
3634
##
@@ -49,7 +47,7 @@ func classifyForFetching*(
4947
return (qSlotsAvail, -1)
5048

5149
template hdr(b: BeaconBuddyRef): StatsCollect =
52-
b.only.thruPutStats.hdr
50+
b.only.thPutStats.hdr
5351

5452
# Are there throughput data available for this peer (aka buddy), at all?
5553
if buddy.hdr.samples == 0:
@@ -79,7 +77,7 @@ func classifyForFetching*(
7977
return (qSlotsAvail, -1)
8078

8179
template blk(b: BeaconBuddyRef): StatsCollect =
82-
b.only.thruPutStats.blk
80+
b.only.thPutStats.blk
8381

8482
if buddy.blk.samples == 0:
8583
return (notEnoughData, -1)

execution_chain/sync/beacon/worker/headers.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ template headersCollect*(buddy: BeaconBuddyRef; info: static[string]) =
138138
base=ctx.chain.baseNumber.bnStr,
139139
head=ctx.chain.latestNumber.bnStr,
140140
target=ctx.hdrCache.head.bnStr,
141-
throughput=buddy.hdrThroughput,
141+
thPut=buddy.hdrThroughput,
142142
nSyncPeers=ctx.pool.nBuddies
143143
ctx.pool.lastSyncUpdLog = Moment.now()
144144
nStashed = 0
@@ -183,7 +183,7 @@ template headersCollect*(buddy: BeaconBuddyRef; info: static[string]) =
183183
base=ctx.chain.baseNumber.bnStr,
184184
head=ctx.chain.latestNumber.bnStr,
185185
target=ctx.hdrCache.head.bnStr,
186-
throughput=buddy.hdrThroughput,
186+
thPut=buddy.hdrThroughput,
187187
nSyncPeers=ctx.pool.nBuddies
188188
ctx.pool.lastSyncUpdLog = Moment.now()
189189

0 commit comments

Comments
 (0)