Skip to content

Commit f52d353

Browse files
committed
[CRE-332] Standalone Workflow Engine script
1 parent 8c696b1 commit f52d353

File tree

28 files changed

+1316
-21
lines changed

28 files changed

+1316
-21
lines changed

core/capabilities/fakes/consensus.go

+218
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
package fakes
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"google.golang.org/protobuf/proto"
8+
9+
"github.com/jonboulle/clockwork"
10+
"github.com/smartcontractkit/libocr/commontypes"
11+
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
12+
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
13+
14+
commonCap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
15+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3"
16+
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/requests"
17+
pbtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types"
18+
"github.com/smartcontractkit/chainlink-common/pkg/services"
19+
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
20+
"github.com/smartcontractkit/chainlink/v2/core/logger"
21+
)
22+
23+
// This capability simulates consensus by running the OCR plugin on a single node, without libOCR.
24+
type FakeConsensusConfig struct {
25+
N int
26+
F int
27+
DeltaRound int
28+
BatchSize int
29+
OutcomePruningThreshold uint64
30+
RequestTimeout time.Duration
31+
}
32+
33+
func DefaultFakeConsensusConfig() FakeConsensusConfig {
34+
return FakeConsensusConfig{
35+
N: 10,
36+
F: 3,
37+
DeltaRound: 5000,
38+
BatchSize: 100,
39+
OutcomePruningThreshold: 1000,
40+
RequestTimeout: time.Second * 20,
41+
}
42+
}
43+
44+
type fakeConsensus struct {
45+
services.Service
46+
eng *services.Engine
47+
48+
config FakeConsensusConfig
49+
plugin ocr3types.ReportingPlugin[[]byte]
50+
transmitter *ocr3.ContractTransmitter
51+
store *requests.Store
52+
cap capIface
53+
stats SimpleStats
54+
55+
previousOutcome []byte
56+
}
57+
58+
type capIface interface {
59+
commonCap.ConsensusCapability
60+
services.Service
61+
}
62+
63+
var _ services.Service = (*fakeConsensus)(nil)
64+
var _ commonCap.ExecutableCapability = (*fakeConsensus)(nil)
65+
66+
const consensusCapID = "[email protected]"
67+
68+
func NewFakeConsensus(lggr logger.Logger, config FakeConsensusConfig) (*fakeConsensus, error) {
69+
rpConfig := ocr3types.ReportingPluginConfig{}
70+
store := requests.NewStore()
71+
72+
capability := ocr3.NewCapability(store, clockwork.NewRealClock(), config.RequestTimeout, capabilities.NewAggregator, capabilities.NewEncoder, lggr, 100)
73+
74+
plugin, err := ocr3.NewReportingPlugin(store, capability, config.BatchSize, rpConfig,
75+
config.OutcomePruningThreshold, lggr)
76+
if err != nil {
77+
return nil, err
78+
}
79+
80+
transmitter := ocr3.NewContractTransmitter(lggr, nil, "")
81+
transmitter.SetCapability(capability)
82+
83+
fc := &fakeConsensus{
84+
config: config,
85+
plugin: plugin,
86+
transmitter: transmitter,
87+
store: store,
88+
cap: capability,
89+
stats: *NewSimpleStats(),
90+
}
91+
fc.Service, fc.eng = services.Config{
92+
Name: "fakeConsensus",
93+
Start: fc.start,
94+
Close: fc.close,
95+
}.NewServiceEngine(lggr)
96+
return fc, nil
97+
}
98+
99+
func (fc *fakeConsensus) start(ctx context.Context) error {
100+
ticker := services.TickerConfig{
101+
Initial: 500 * time.Millisecond,
102+
JitterPct: 0.0,
103+
}.NewTicker(time.Duration(fc.config.DeltaRound) * time.Millisecond)
104+
fc.eng.GoTick(ticker, fc.simulateOCRRound)
105+
return fc.cap.Start(ctx)
106+
}
107+
108+
func (fc *fakeConsensus) close() error {
109+
err := fc.cap.Close()
110+
fc.stats.PrintToStdout("Consensus Capability Stats")
111+
return err
112+
}
113+
114+
func (fc *fakeConsensus) simulateOCRRound(ctx context.Context) {
115+
runCtx, cancel := context.WithTimeout(ctx, time.Second*10)
116+
defer cancel()
117+
ocrCtx := ocr3types.OutcomeContext{
118+
SeqNr: 1,
119+
PreviousOutcome: fc.previousOutcome,
120+
Epoch: 1,
121+
Round: 1,
122+
}
123+
124+
query, err := fc.plugin.Query(runCtx, ocrCtx)
125+
if err != nil {
126+
fc.eng.Errorw("Error running Query", "error", err)
127+
return
128+
}
129+
fc.eng.Debugw("Query execution complete", "size", len(query))
130+
131+
now := time.Now()
132+
observation, err := fc.plugin.Observation(runCtx, ocrCtx, query)
133+
elapsed := time.Since(now)
134+
if err != nil {
135+
fc.eng.Errorw("Error running Observation", "error", err)
136+
return
137+
}
138+
fc.eng.Debugw("Observation execution complete", "size", len(observation), "durationMS", elapsed.Milliseconds())
139+
fc.stats.UpdateMaxStat("Max observation size (bytes)", int64(len(observation)))
140+
fc.stats.UpdateMaxStat("Max observation duration (ms)", elapsed.Milliseconds())
141+
142+
aos := []types.AttributedObservation{}
143+
oracleID := uint8(0)
144+
for range 2*fc.config.F + 1 {
145+
aos = append(aos, types.AttributedObservation{
146+
Observation: observation,
147+
Observer: commontypes.OracleID(oracleID),
148+
})
149+
oracleID++
150+
}
151+
152+
now = time.Now()
153+
outcome, err := fc.plugin.Outcome(runCtx, ocrCtx, query, aos)
154+
elapsed = time.Since(now)
155+
if err != nil {
156+
fc.eng.Errorw("Error running Outcome", "error", err)
157+
return
158+
}
159+
fc.eng.Debugw("Outcome execution complete", "size", len(outcome), "durationMS", elapsed.Milliseconds())
160+
fc.stats.UpdateMaxStat("Max outcome size (bytes)", int64(len(outcome)))
161+
fc.stats.UpdateMaxStat("Max outcome duration (ms)", elapsed.Milliseconds())
162+
163+
fc.previousOutcome = outcome
164+
165+
// calculate the size of the previous outcome minus current reports,
166+
// which is the data that will be preserved as long as a workflow exists
167+
// in the system
168+
unmarshaled := &pbtypes.Outcome{}
169+
_ = proto.Unmarshal(outcome, unmarshaled)
170+
unmarshaled.Outcomes = nil
171+
rawCleaned, _ := proto.Marshal(unmarshaled)
172+
fc.stats.UpdateMaxStat("Max preserved outcome size (bytes)", int64(len(rawCleaned)))
173+
174+
now = time.Now()
175+
reports, err := fc.plugin.Reports(runCtx, 1, outcome)
176+
elapsed = time.Since(now)
177+
if err != nil {
178+
fc.eng.Errorw("Error running Reports", "error", err)
179+
return
180+
}
181+
182+
for _, report := range reports {
183+
reportSize := len(report.ReportWithInfo.Report)
184+
fc.eng.Infow("Sending report", "report", report, "size", reportSize)
185+
fc.stats.UpdateMaxStat("Max report size (bytes)", int64(reportSize))
186+
fc.stats.UpdateMaxStat("Max report duration (ms)", elapsed.Milliseconds())
187+
emptyDigest := [32]byte{}
188+
// TODO: add non-empty signatures
189+
err = fc.transmitter.Transmit(runCtx, emptyDigest, 1, report.ReportWithInfo, []types.AttributedOnchainSignature{})
190+
if err != nil {
191+
fc.eng.Errorw("Error transmitting report", "error", err)
192+
}
193+
}
194+
}
195+
196+
func (fc *fakeConsensus) Execute(ctx context.Context, request commonCap.CapabilityRequest) (commonCap.CapabilityResponse, error) {
197+
return fc.cap.Execute(ctx, request)
198+
}
199+
200+
func (fc *fakeConsensus) RegisterToWorkflow(ctx context.Context, request commonCap.RegisterToWorkflowRequest) error {
201+
fc.eng.Infow("Registering to Fake Consensus", "workflowID", request.Metadata.WorkflowID)
202+
return fc.cap.RegisterToWorkflow(ctx, request)
203+
}
204+
205+
func (fc *fakeConsensus) UnregisterFromWorkflow(ctx context.Context, request commonCap.UnregisterFromWorkflowRequest) error {
206+
fc.eng.Infow("Unegistering from Fake Consensus", "workflowID", request.Metadata.WorkflowID)
207+
return fc.cap.UnregisterFromWorkflow(ctx, request)
208+
}
209+
210+
func (fc *fakeConsensus) Info(ctx context.Context) (commonCap.CapabilityInfo, error) {
211+
return commonCap.CapabilityInfo{
212+
ID: consensusCapID,
213+
CapabilityType: commonCap.CapabilityTypeConsensus,
214+
Description: "Fake OCR Consensus",
215+
DON: &commonCap.DON{},
216+
IsLocal: true,
217+
}, nil
218+
}

0 commit comments

Comments
 (0)