Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wasm spec factory #14458

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ race.*
golangci-lint-output.txt
/golangci-lint/
.covdata
core/services/job/testdata/wasm/testmodule.wasm

# DB state
./db/
Expand Down
3 changes: 2 additions & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/prometheus/client_golang v1.20.0
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v1.0.4
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240918210534-564164004d06
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240919092417-53e784c2e420
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7
github.com/spf13/cobra v1.8.1
Expand Down Expand Up @@ -70,6 +70,7 @@ require (
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/bytecodealliance/wasmtime-go/v23 v23.0.0 // indirect
github.com/bytedance/sonic v1.10.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZ
github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bytecodealliance/wasmtime-go/v23 v23.0.0 h1:NJvU4S8KEk1GnF6+FvlnzMD/8wXTj/mYJSG6Q4yu3Pw=
github.com/bytecodealliance/wasmtime-go/v23 v23.0.0/go.mod h1:5YIL+Ouiww2zpO7u+iZ1U1G5NvmwQYaXdmCZQGjQM0U=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM=
github.com/bytedance/sonic v1.10.1 h1:7a1wuFXL1cMy7a3f7/VFcEtriuXQnUBhtoVfOZiaysc=
Expand Down Expand Up @@ -1083,8 +1085,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um
github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240917180332-5a68498d1612 h1:xPEM9XbfZmv8N3NjZ7AX5salonll/LdXrbb8JCbA4FE=
github.com/smartcontractkit/chainlink-ccip v0.0.0-20240917180332-5a68498d1612/go.mod h1:Lv77O13ZxOdmlvnu2vaUC0Lg+t3JAL+N+9K8dRsgmDI=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240918210534-564164004d06 h1:wqLXuPdiUkn7es/epKmOpB0Q0tKdA9FkYPNQZrZ+VJU=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240918210534-564164004d06/go.mod h1:zm+l8gN4LQS1+YvwQDhRz/njirVeWGNiDJKIhCGwaoQ=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240919092417-53e784c2e420 h1:+xNnYYgkxzKUIkLCOfzfAKUxeLLtuxlalDI70kNJ8No=
github.com/smartcontractkit/chainlink-common v0.2.3-0.20240919092417-53e784c2e420/go.mod h1:zm+l8gN4LQS1+YvwQDhRz/njirVeWGNiDJKIhCGwaoQ=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q=
Expand Down
2 changes: 1 addition & 1 deletion core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
js, err = workflows.ValidatedWorkflowJobSpec(ctx, spec)
js, err = workflows.ValidatedWorkflowJobSpec(ctx, s.lggr, spec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a logger for validation? Typically we want to either log something, (x)or return an error to be logged - not both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed it to call into the WASM creation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather change the WASM runner to not need it if we're not going to use it.

case job.CCIP:
js, err = ccip.ValidatedCCIPSpec(spec)
default:
Expand Down
2 changes: 1 addition & 1 deletion core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,7 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {

func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 {
t.Helper()
err := s.Validate(testutils.Context(t))
err := s.Validate(testutils.Context(t), logger.NullLogger)
require.NoError(t, err, "failed to validate spec %v", s)
ctx := testutils.Context(t)
_, err = toml.Marshal(s.Workflow)
Expand Down
10 changes: 6 additions & 4 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand Down Expand Up @@ -865,6 +866,7 @@ type WorkflowSpecType string

const (
YamlSpec WorkflowSpecType = "yaml"
WASMFile WorkflowSpecType = "wasm_file"
DefaultSpecType = YamlSpec
)

Expand Down Expand Up @@ -893,13 +895,13 @@ const (
)

// Validate checks the workflow spec for correctness
func (w *WorkflowSpec) Validate(ctx context.Context) error {
func (w *WorkflowSpec) Validate(ctx context.Context, logger logger.Logger) error {
s, err := pkgworkflows.ParseWorkflowSpecYaml(w.Workflow)
if err != nil {
return fmt.Errorf("%w: failed to parse workflow spec %s: %w", ErrInvalidWorkflowYAMLSpec, w.Workflow, err)
}

if _, err = w.SDKSpec(ctx); err != nil {
if _, err = w.SDKSpec(ctx, logger); err != nil {
jmank88 marked this conversation as resolved.
Show resolved Hide resolved
return err
}

Expand All @@ -913,12 +915,12 @@ func (w *WorkflowSpec) Validate(ctx context.Context) error {
return nil
}

func (w *WorkflowSpec) SDKSpec(ctx context.Context) (sdk.WorkflowSpec, error) {
func (w *WorkflowSpec) SDKSpec(ctx context.Context, lggr logger.Logger) (sdk.WorkflowSpec, error) {
if w.sdkWorkflow != nil {
return *w.sdkWorkflow, nil
}

spec, cid, err := workflowSpecFactory.Spec(ctx, w.Workflow, []byte(w.Config), w.SpecType)
spec, cid, err := workflowSpecFactory.Spec(ctx, lggr, w.Workflow, []byte(w.Config), w.SpecType)
if err != nil {
return sdk.WorkflowSpec{}, err
}
Expand Down
3 changes: 2 additions & 1 deletion core/services/job/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -324,7 +325,7 @@ func TestWorkflowSpec_Validate(t *testing.T) {
w := &WorkflowSpec{
Workflow: tt.fields.Workflow,
}
err := w.Validate(testutils.Context(t))
err := w.Validate(testutils.Context(t), logger.NullLogger)
require.Equal(t, tt.wantError, err != nil)
if !tt.wantError {
assert.NotEmpty(t, w.WorkflowID)
Expand Down
33 changes: 33 additions & 0 deletions core/services/job/testdata/wasm/test_workflow_spec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//go:build wasip1

package main

import (
"encoding/json"
"log"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities/cli/cmd/testdata/fixtures/capabilities/basictrigger"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
)

func BuildWorkflow(config []byte) *sdk.WorkflowSpecFactory {
params := sdk.NewWorkflowParams{}
if err := json.Unmarshal(config, &params); err != nil {
log.Fatal(err)
}

workflow := sdk.NewWorkflowSpecFactory(params)

triggerCfg := basictrigger.TriggerConfig{Name: "trigger", Number: 100}
_ = triggerCfg.New(workflow)

return workflow
}

func main() {
runner := wasm.NewRunner()
workflow := BuildWorkflow(runner.Config())
runner.Run(workflow)
}
32 changes: 32 additions & 0 deletions core/services/job/wasm_file_spec_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package job

import (
"context"
"errors"
"os"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type WasmFileSpecFactory struct{}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For visibility to reviewers:

In a side-channel discussion with Cedric, he mentioned that he thinks this should support URLs as well.

I commented that I thought by the time the workflow runs, it would have already ensured that the file is on the box locally. I was thinking that if it needs to use HTTP to fetch the file, that could be done just once. The HTTP invoker could have a deterministic location to save the workflow file. It could then check if the file exists and download it if it doesn’t. Once the file is on disk, it would invoke the WASM file spec.

I’m open to hearing what others think as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other potential use case, if we end up having any pre-build WASMs that users can run by providing their own config, we could leave them in fixed locations


func (w WasmFileSpecFactory) Spec(ctx context.Context, lggr logger.Logger, rawSpec, config []byte) (sdk.WorkflowSpec, error) {
moduleConfig := &host.ModuleConfig{Logger: lggr}
spec, err := host.GetWorkflowSpec(moduleConfig, rawSpec, config)
if err != nil {
return sdk.WorkflowSpec{}, err
} else if spec == nil {
return sdk.WorkflowSpec{}, errors.New("workflow spec not found when running wasm")
}

return *spec, nil
}

func (w WasmFileSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error) {
return os.ReadFile(wf)
}

var _ SDKWorkflowSpecFactory = (*WasmFileSpecFactory)(nil)
52 changes: 52 additions & 0 deletions core/services/job/wasm_file_spec_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package job_test

import (
"encoding/json"
"os"
"os/exec"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

func TestWasmFileSpecFactory(t *testing.T) {
binaryLocation := createTestBinary(t)
config, err := json.Marshal(sdk.NewWorkflowParams{
Owner: "owner",
Name: "name",
})
require.NoError(t, err)

factory := job.WasmFileSpecFactory{}
rawSpec, err := factory.RawSpec(testutils.Context(t), binaryLocation)
require.NoError(t, err)
actual, err := factory.Spec(testutils.Context(t), logger.NullLogger, rawSpec, config)
require.NoError(t, err)

rawBinary, err := os.ReadFile(binaryLocation)
require.NoError(t, err)
expected, err := host.GetWorkflowSpec(&host.ModuleConfig{Logger: logger.NullLogger}, rawBinary, config)
require.NoError(t, err)

require.Equal(t, *expected, actual)
}

func createTestBinary(t *testing.T) string {
const testBinaryLocation = "testdata/wasm/testmodule.wasm"

cmd := exec.Command("go", "build", "-o", testBinaryLocation, "github.com/smartcontractkit/chainlink/v2/core/services/job/testdata/wasm")
cmd.Env = append(os.Environ(), "GOOS=wasip1", "GOARCH=wasm")

output, err := cmd.CombinedOutput()
require.NoError(t, err, string(output))

return testBinaryLocation
}
9 changes: 6 additions & 3 deletions core/services/job/workflow_spec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ import (
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var ErrInvalidWorkflowType = errors.New("invalid workflow type")

type SDKWorkflowSpecFactory interface {
Spec(ctx context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error)
Spec(ctx context.Context, lggr logger.Logger, rawSpec, config []byte) (sdk.WorkflowSpec, error)
RawSpec(ctx context.Context, wf string) ([]byte, error)
}

type WorkflowSpecFactory map[WorkflowSpecType]SDKWorkflowSpecFactory

func (wsf WorkflowSpecFactory) Spec(
ctx context.Context, workflow string, config []byte, tpe WorkflowSpecType) (sdk.WorkflowSpec, string, error) {
ctx context.Context, lggr logger.Logger, workflow string, config []byte, tpe WorkflowSpecType) (sdk.WorkflowSpec, string, error) {
if tpe == "" {
tpe = DefaultSpecType
}
Expand All @@ -34,7 +36,7 @@ func (wsf WorkflowSpecFactory) Spec(
return sdk.WorkflowSpec{}, "", err
}

spec, err := factory.Spec(ctx, rawSpec, config)
spec, err := factory.Spec(ctx, lggr, rawSpec, config)
if err != nil {
return sdk.WorkflowSpec{}, "", err
}
Expand All @@ -48,4 +50,5 @@ func (wsf WorkflowSpecFactory) Spec(

var workflowSpecFactory = WorkflowSpecFactory{
YamlSpec: YAMLSpecFactory{},
WASMFile: WasmFileSpecFactory{},
}
11 changes: 6 additions & 5 deletions core/services/job/workflow_spec_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

Expand All @@ -35,7 +36,7 @@ func TestWorkflowSpecFactory_ToSpec(t *testing.T) {
factory := job.WorkflowSpecFactory{
job.YamlSpec: mockSdkSpecFactory{t: t, noConfig: true, SpecVal: anySpec},
}
results, cid, err := factory.Spec(testutils.Context(t), anyData, nil, job.YamlSpec)
results, cid, err := factory.Spec(testutils.Context(t), logger.NullLogger, anyData, nil, job.YamlSpec)
require.NoError(t, err)

assert.Equal(t, anySpec, results)
Expand All @@ -52,7 +53,7 @@ func TestWorkflowSpecFactory_ToSpec(t *testing.T) {
job.YamlSpec: mockSdkSpecFactory{t: t, Err: anyErr},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, job.YamlSpec)
_, _, err := factory.Spec(testutils.Context(t), logger.NullLogger, anyData, anyConfig, job.YamlSpec)
assert.Equal(t, anyErr, err)
})

Expand All @@ -61,7 +62,7 @@ func TestWorkflowSpecFactory_ToSpec(t *testing.T) {
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

_, _, err := factory.Spec(testutils.Context(t), anyData, anyConfig, "unsupported")
_, _, err := factory.Spec(testutils.Context(t), logger.NullLogger, anyData, anyConfig, "unsupported")
assert.Error(t, err)
})
}
Expand All @@ -71,7 +72,7 @@ func runYamlSpecTest(t *testing.T, anySpec sdk.WorkflowSpec, anyData string, any
job.YamlSpec: mockSdkSpecFactory{t: t, SpecVal: anySpec},
}

results, cid, err := factory.Spec(testutils.Context(t), anyData, anyConfig, specType)
results, cid, err := factory.Spec(testutils.Context(t), logger.NullLogger, anyData, anyConfig, specType)

require.NoError(t, err)
assert.Equal(t, anySpec, results)
Expand All @@ -94,7 +95,7 @@ func (f mockSdkSpecFactory) RawSpec(_ context.Context, wf string) ([]byte, error
return []byte(wf), nil
}

func (f mockSdkSpecFactory) Spec(_ context.Context, rawSpec, config []byte) (sdk.WorkflowSpec, error) {
func (f mockSdkSpecFactory) Spec(_ context.Context, _ logger.Logger, rawSpec, config []byte) (sdk.WorkflowSpec, error) {
assert.ElementsMatch(f.t, rawSpec, []byte("any data"))
if f.noConfig {
assert.Nil(f.t, config)
Expand Down
4 changes: 3 additions & 1 deletion core/services/job/yaml_spec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk"

"github.com/smartcontractkit/chainlink/v2/core/logger"
)

type YAMLSpecFactory struct{}

var _ SDKWorkflowSpecFactory = (*YAMLSpecFactory)(nil)

func (y YAMLSpecFactory) Spec(_ context.Context, rawSpec, _ []byte) (sdk.WorkflowSpec, error) {
func (y YAMLSpecFactory) Spec(_ context.Context, _ logger.Logger, rawSpec, _ []byte) (sdk.WorkflowSpec, error) {
return workflows.ParseWorkflowSpecYaml(string(rawSpec))
}

Expand Down
3 changes: 2 additions & 1 deletion core/services/job/yaml_spec_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
commonworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

Expand Down Expand Up @@ -64,7 +65,7 @@ targets:
func TestYamlSpecFactory_GetSpec(t *testing.T) {
t.Parallel()

actual, err := job.YAMLSpecFactory{}.Spec(testutils.Context(t), []byte(anyYamlSpec), []byte{})
actual, err := job.YAMLSpecFactory{}.Spec(testutils.Context(t), logger.NullLogger, []byte(anyYamlSpec), []byte{})
require.NoError(t, err)

expected, err := commonworkflows.ParseWorkflowSpecYaml(anyYamlSpec)
Expand Down
Loading
Loading