From fc7c873bf03c1ca7998ec11ec564e85f1cbacba6 Mon Sep 17 00:00:00 2001 From: Masayoshi Mizutani Date: Sun, 28 Jan 2024 10:13:03 +0900 Subject: [PATCH] Implement Action methods for database --- gqlgen.yml | 3 + graphql/schema.graphqls | 4 +- pkg/controller/graphql/generated.go | 85 +++++++++++++++++++- pkg/domain/interfaces/interfaces.go | 7 ++ pkg/domain/model/graphql.go | 3 +- pkg/domain/types/types.go | 1 + pkg/infra/database_test.go | 117 +++++++++++++++++++++++++++- pkg/infra/firestore/client.go | 74 ++++++++++++++++++ pkg/infra/memory/client.go | 41 ++++++++++ 9 files changed, 326 insertions(+), 9 deletions(-) diff --git a/gqlgen.yml b/gqlgen.yml index f30c3ff..4f45531 100644 --- a/gqlgen.yml +++ b/gqlgen.yml @@ -44,6 +44,9 @@ models: AlertID: model: - github.com/m-mizutani/alertchain/pkg/domain/types.AlertID + ActionID: + model: + - github.com/m-mizutani/alertchain/pkg/domain/types.ActionID Timestamp: model: - github.com/99designs/gqlgen/graphql.Time diff --git a/graphql/schema.graphqls b/graphql/schema.graphqls index ea44bd2..d5de66a 100644 --- a/graphql/schema.graphqls +++ b/graphql/schema.graphqls @@ -3,6 +3,7 @@ scalar Timestamp # Represents time.Time scalar WorkflowID # Represents uuid.UUID scalar AlertID # Represents uuid.UUID +scalar ActionID # Represents uuid.UUID type WorkflowRecord { id: WorkflowID! createdAt: Timestamp! @@ -41,7 +42,8 @@ type ReferenceRecord { } type ActionRecord { - id: String! + id: ActionID! + workflow_id: WorkflowID! seq: Int! uses: String! args: [ArgumentRecord!]! diff --git a/pkg/controller/graphql/generated.go b/pkg/controller/graphql/generated.go index 4d198fd..56b74d7 100644 --- a/pkg/controller/graphql/generated.go +++ b/pkg/controller/graphql/generated.go @@ -58,6 +58,7 @@ type ComplexityRoot struct { Seq func(childComplexity int) int StartedAt func(childComplexity int) int Uses func(childComplexity int) int + WorkflowID func(childComplexity int) int } AlertRecord struct { @@ -201,6 +202,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.ActionRecord.Uses(childComplexity), true + case "ActionRecord.workflow_id": + if e.complexity.ActionRecord.WorkflowID == nil { + break + } + + return e.complexity.ActionRecord.WorkflowID(childComplexity), true + case "AlertRecord.createdAt": if e.complexity.AlertRecord.CreatedAt == nil { break @@ -508,6 +516,7 @@ var sources = []*ast.Source{ scalar Timestamp # Represents time.Time scalar WorkflowID # Represents uuid.UUID scalar AlertID # Represents uuid.UUID +scalar ActionID # Represents uuid.UUID type WorkflowRecord { id: WorkflowID! createdAt: Timestamp! @@ -546,7 +555,8 @@ type ReferenceRecord { } type ActionRecord { - id: String! + id: ActionID! + workflow_id: WorkflowID! seq: Int! uses: String! args: [ArgumentRecord!]! @@ -698,9 +708,9 @@ func (ec *executionContext) _ActionRecord_id(ctx context.Context, field graphql. } return graphql.Null } - res := resTmp.(string) + res := resTmp.(types.ActionID) fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) + return ec.marshalNActionID2githubᚗcomᚋmᚑmizutaniᚋalertchainᚋpkgᚋdomainᚋtypesᚐActionID(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_ActionRecord_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -710,7 +720,51 @@ func (ec *executionContext) fieldContext_ActionRecord_id(ctx context.Context, fi IsMethod: false, IsResolver: false, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type String does not have child fields") + return nil, errors.New("field of type ActionID does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _ActionRecord_workflow_id(ctx context.Context, field graphql.CollectedField, obj *model.ActionRecord) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_ActionRecord_workflow_id(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.WorkflowID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(types.WorkflowID) + fc.Result = res + return ec.marshalNWorkflowID2githubᚗcomᚋmᚑmizutaniᚋalertchainᚋpkgᚋdomainᚋtypesᚐWorkflowID(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_ActionRecord_workflow_id(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "ActionRecord", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type WorkflowID does not have child fields") }, } return fc, nil @@ -2578,6 +2632,8 @@ func (ec *executionContext) fieldContext_WorkflowRecord_actions(ctx context.Cont switch field.Name { case "id": return ec.fieldContext_ActionRecord_id(ctx, field) + case "workflow_id": + return ec.fieldContext_ActionRecord_workflow_id(ctx, field) case "seq": return ec.fieldContext_ActionRecord_seq(ctx, field) case "uses": @@ -4398,6 +4454,11 @@ func (ec *executionContext) _ActionRecord(ctx context.Context, sel ast.Selection if out.Values[i] == graphql.Null { out.Invalids++ } + case "workflow_id": + out.Values[i] = ec._ActionRecord_workflow_id(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } case "seq": out.Values[i] = ec._ActionRecord_seq(ctx, field, obj) if out.Values[i] == graphql.Null { @@ -5233,6 +5294,22 @@ func (ec *executionContext) ___Type(ctx context.Context, sel ast.SelectionSet, o // region ***************************** type.gotpl ***************************** +func (ec *executionContext) unmarshalNActionID2githubᚗcomᚋmᚑmizutaniᚋalertchainᚋpkgᚋdomainᚋtypesᚐActionID(ctx context.Context, v interface{}) (types.ActionID, error) { + tmp, err := graphql.UnmarshalString(v) + res := types.ActionID(tmp) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNActionID2githubᚗcomᚋmᚑmizutaniᚋalertchainᚋpkgᚋdomainᚋtypesᚐActionID(ctx context.Context, sel ast.SelectionSet, v types.ActionID) graphql.Marshaler { + res := graphql.MarshalString(string(v)) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) marshalNActionRecord2ᚕᚖgithubᚗcomᚋmᚑmizutaniᚋalertchainᚋpkgᚋdomainᚋmodelᚐActionRecordᚄ(ctx context.Context, sel ast.SelectionSet, v []*model.ActionRecord) graphql.Marshaler { ret := make(graphql.Array, len(v)) var wg sync.WaitGroup diff --git a/pkg/domain/interfaces/interfaces.go b/pkg/domain/interfaces/interfaces.go index 618b19a..e459517 100644 --- a/pkg/domain/interfaces/interfaces.go +++ b/pkg/domain/interfaces/interfaces.go @@ -44,9 +44,16 @@ type TxProc func(ctx *model.Context, input model.Attributes) (model.Attributes, type Database interface { GetAttrs(ctx *model.Context, ns types.Namespace) (model.Attributes, error) PutAttrs(ctx *model.Context, ns types.Namespace, attrs model.Attributes) error + PutWorkflow(ctx *model.Context, workflow model.WorkflowRecord) error GetWorkflows(ctx *model.Context, offset, limit int) ([]model.WorkflowRecord, error) GetWorkflow(ctx *model.Context, id types.WorkflowID) (*model.WorkflowRecord, error) + + PutAction(ctx *model.Context, action model.ActionRecord) error + GetAction(ctx *model.Context, id types.ActionID) (*model.ActionRecord, error) + GetActions(ctx *model.Context, ids []types.ActionID) ([]model.ActionRecord, error) + GetActionByWorkflowID(ctx *model.Context, workflowID types.WorkflowID) ([]model.ActionRecord, error) + Lock(ctx *model.Context, ns types.Namespace, timeout time.Time) error Unlock(ctx *model.Context, ns types.Namespace) error Close() error diff --git a/pkg/domain/model/graphql.go b/pkg/domain/model/graphql.go index a631145..f901321 100644 --- a/pkg/domain/model/graphql.go +++ b/pkg/domain/model/graphql.go @@ -9,7 +9,8 @@ import ( ) type ActionRecord struct { - ID string `json:"id"` + ID types.ActionID `json:"id"` + WorkflowID types.WorkflowID `json:"workflow_id"` Seq int `json:"seq"` Uses string `json:"uses"` Args []*ArgumentRecord `json:"args"` diff --git a/pkg/domain/types/types.go b/pkg/domain/types/types.go index 8b30190..eb14058 100644 --- a/pkg/domain/types/types.go +++ b/pkg/domain/types/types.go @@ -36,3 +36,4 @@ func NewWorkflowID() WorkflowID { func (x AlertID) String() string { return string(x) } func (x WorkflowID) String() string { return string(x) } +func (x ActionID) String() string { return string(x) } diff --git a/pkg/infra/database_test.go b/pkg/infra/database_test.go index db90fd9..54d7c67 100644 --- a/pkg/infra/database_test.go +++ b/pkg/infra/database_test.go @@ -23,18 +23,18 @@ func TestMemory(t *testing.T) { func TestFirestore(t *testing.T) { var ( projectID string - collection string + databaseID string ) if err := utils.LoadEnv( utils.EnvDef("TEST_FIRESTORE_PROJECT_ID", &projectID), - utils.EnvDef("TEST_FIRESTORE_COLLECTION_PREFIX", &collection), + utils.EnvDef("TEST_FIRESTORE_DATABASE_ID", &databaseID), ); err != nil { t.Skipf("Skip test due to missing env: %v", err) } ctx := model.NewContext() - client := gt.R1(firestore.New(ctx, projectID, collection)).NoError(t) + client := gt.R1(firestore.New(ctx, projectID, databaseID)).NoError(t) testClient(t, client) } @@ -52,6 +52,9 @@ func testClient(t *testing.T, client interfaces.Database) { t.Run("Workflow", func(t *testing.T) { testWorkflow(t, client) }) + t.Run("Action", func(t *testing.T) { + testAction(t, client) + }) } func testPutGet(t *testing.T, client interfaces.Database) { @@ -245,3 +248,111 @@ func testWorkflow(t *testing.T, client interfaces.Database) { gt.V(t, resp.Alert.ID).Equal(workflows[2].Alert.ID) }) } + +func testAction(t *testing.T, db interfaces.Database) { + ctx := model.NewContext() + workflow1 := model.WorkflowRecord{ + ID: types.NewWorkflowID(), + CreatedAt: time.Now(), + } + workflow2 := model.WorkflowRecord{ + ID: types.NewWorkflowID(), + CreatedAt: time.Now(), + } + + gt.NoError(t, db.PutWorkflow(ctx, workflow1)) + + actions := []model.ActionRecord{ + { + ID: types.NewActionID(), + WorkflowID: workflow1.ID, + Seq: 0, + Uses: "test1", + Args: []*model.ArgumentRecord{ + { + Key: "key1", + Value: "value1", + }, + }, + }, + { + ID: types.NewActionID(), + WorkflowID: workflow1.ID, + Seq: 1, + Uses: "test2", + Args: []*model.ArgumentRecord{ + { + Key: "key2", + Value: "value2", + }, + }, + }, + { + ID: types.NewActionID(), + WorkflowID: workflow1.ID, + Seq: 2, + Uses: "test3", + Args: []*model.ArgumentRecord{}, + Next: []*model.NextRecord{}, + StartedAt: time.Now(), + FinishedAt: time.Now(), + }, + { + ID: types.NewActionID(), + WorkflowID: workflow2.ID, + Seq: 0, + Uses: "test4", + Args: []*model.ArgumentRecord{}, + Next: []*model.NextRecord{}, + StartedAt: time.Now(), + FinishedAt: time.Now(), + }, + } + + for _, action := range actions { + gt.NoError(t, db.PutAction(ctx, action)) + } + + t.Run("GetAction", func(t *testing.T) { + resp := gt.R1(db.GetAction(ctx, actions[1].ID)).NoError(t) + gt.V(t, resp).Must().NotNil() + gt.V(t, resp.Uses).Equal("test2") + }) + + t.Run("GetActionByWorkflowID", func(t *testing.T) { + resp := gt.R1(db.GetActionByWorkflowID(ctx, types.WorkflowID(workflow1.ID))).NoError(t) + gt.A(t, resp).Length(3). + MatchThen(func(v model.ActionRecord) bool { + return v.ID == actions[0].ID + }, func(t testing.TB, v model.ActionRecord) { + gt.V(t, v.Uses).Equal("test1") + }). + MatchThen(func(v model.ActionRecord) bool { + return v.ID == actions[1].ID + }, func(t testing.TB, v model.ActionRecord) { + gt.V(t, v.Uses).Equal("test2") + }). + MatchThen(func(v model.ActionRecord) bool { + return v.ID == actions[2].ID + }, func(t testing.TB, v model.ActionRecord) { + gt.V(t, v.Uses).Equal("test3") + }) + }) + + t.Run("GetActions", func(t *testing.T) { + resp := gt.R1(db.GetActions(ctx, []types.ActionID{actions[0].ID, actions[3].ID})).NoError(t) + gt.A(t, resp).Length(2). + MatchThen(func(v model.ActionRecord) bool { + return v.ID == actions[0].ID + }, func(t testing.TB, v model.ActionRecord) { + gt.V(t, v.Uses).Equal("test1") + gt.V(t, v.WorkflowID).Equal(workflow1.ID) + }). + MatchThen(func(v model.ActionRecord) bool { + return v.ID == actions[3].ID + }, func(t testing.TB, v model.ActionRecord) { + gt.V(t, v.Uses).Equal("test4") + gt.V(t, v.WorkflowID).Equal(workflow2.ID) + }) + }) +} diff --git a/pkg/infra/firestore/client.go b/pkg/infra/firestore/client.go index ea98459..550ee0c 100644 --- a/pkg/infra/firestore/client.go +++ b/pkg/infra/firestore/client.go @@ -26,6 +26,79 @@ type Client struct { databaseID string attrCollection string workflowCollection string + actionCollection string +} + +// GetAction implements interfaces.Database. +func (x *Client) GetAction(ctx *model.Context, id types.ActionID) (*model.ActionRecord, error) { + doc, err := x.client.Collection(x.actionCollection).Doc(id.String()).Get(ctx) + if err != nil { + if status.Code(err) == codes.NotFound { + return nil, nil + } + return nil, types.AsRuntimeErr(goerr.Wrap(err, "failed to get attributes from firestore")) + } + + var action model.ActionRecord + if err := doc.DataTo(&action); err != nil { + return nil, types.AsRuntimeErr(goerr.Wrap(err, "failed to unmarshal attribute from firestore")) + } + + return &action, nil +} + +// GetActions implements interfaces.Database. +func (x *Client) GetActions(ctx *model.Context, ids []types.ActionID) ([]model.ActionRecord, error) { + var ret []model.ActionRecord + + iter := x.client.Collection(x.actionCollection).Where("ID", "in", ids).Documents(ctx) + for { + doc, err := iter.Next() + if err != nil { + if errors.Is(err, iterator.Done) { + return ret, nil + } + return nil, types.AsRuntimeErr(goerr.Wrap(err, "failed to get attributes from firestore")) + } + + var action model.ActionRecord + if err := doc.DataTo(&action); err != nil { + return nil, types.AsRuntimeErr(goerr.Wrap(err, "failed to unmarshal attribute from firestore")) + } + ret = append(ret, action) + } +} + +// GetActionByWorkflowID implements interfaces.Database. +func (x *Client) GetActionByWorkflowID(ctx *model.Context, workflowID types.WorkflowID) ([]model.ActionRecord, error) { + var actions []model.ActionRecord + iter := x.client.Collection(x.actionCollection). + Where("WorkflowID", "==", workflowID). + Documents(ctx) + + for { + doc, err := iter.Next() + if err != nil { + if errors.Is(err, iterator.Done) { + return actions, nil + } + return nil, types.AsRuntimeErr(goerr.Wrap(err, "failed to get workflow")) + } + + var action model.ActionRecord + if err := doc.DataTo(&action); err != nil { + return nil, types.AsRuntimeErr(goerr.Wrap(err, "failed to unmarshal workflow")) + } + actions = append(actions, action) + } +} + +// PutAction implements interfaces.Database. +func (x *Client) PutAction(ctx *model.Context, action model.ActionRecord) error { + if _, err := x.client.Collection(x.actionCollection).Doc(action.ID.String()).Set(ctx, action); err != nil { + return types.AsRuntimeErr(goerr.Wrap(err, "failed to put action")) + } + return nil } const ( @@ -309,6 +382,7 @@ func New(ctx *model.Context, projectID string, databaseID string) (*Client, erro databaseID: databaseID, attrCollection: "attrs", workflowCollection: "workflows", + actionCollection: "actions", }, nil } diff --git a/pkg/infra/memory/client.go b/pkg/infra/memory/client.go index a10c485..a2f97a9 100644 --- a/pkg/infra/memory/client.go +++ b/pkg/infra/memory/client.go @@ -19,17 +19,58 @@ type Client struct { attrs map[types.Namespace]map[types.AttrID]*model.Attribute locks map[types.Namespace]*lock workflows map[types.WorkflowID]model.WorkflowRecord + actions map[types.ActionID]model.ActionRecord attrMutex sync.RWMutex lockMutex sync.Mutex workflowMutex sync.RWMutex } +// GetAction implements interfaces.Database. +func (x *Client) GetAction(ctx *model.Context, id types.ActionID) (*model.ActionRecord, error) { + action, ok := x.actions[id] + if !ok { + return nil, nil + } + return &action, nil +} + +// GetActions implements interfaces.Database. +func (x *Client) GetActions(ctx *model.Context, ids []types.ActionID) ([]model.ActionRecord, error) { + var ret []model.ActionRecord + + for _, id := range ids { + if action, ok := x.actions[id]; ok { + ret = append(ret, action) + } + } + + return ret, nil +} + +// GetActionByWorkflowID implements interfaces.Database. +func (x *Client) GetActionByWorkflowID(ctx *model.Context, workflowID types.WorkflowID) ([]model.ActionRecord, error) { + var ret []model.ActionRecord + for i, action := range x.actions { + if action.WorkflowID == workflowID { + ret = append(ret, x.actions[i]) + } + } + return ret, nil +} + +// PutAction implements interfaces.Database. +func (x *Client) PutAction(ctx *model.Context, action model.ActionRecord) error { + x.actions[action.ID] = action + return nil +} + func New() *Client { return &Client{ attrs: map[types.Namespace]map[types.AttrID]*model.Attribute{}, locks: map[types.Namespace]*lock{}, workflows: map[types.WorkflowID]model.WorkflowRecord{}, + actions: map[types.ActionID]model.ActionRecord{}, } }