Skip to content
This repository was archived by the owner on Aug 15, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ services:
ports:
- 6007:6007

redis:
container_name: redis
image: redis:6.2.2
command: ["redis-server", "--appendonly", "yes"]
networks:
- default
ports:
- 6379:6379
volumes:
- redis-data:/data

#########################################################################################
# A leaf service that does work.
#########################################################################################
Expand Down Expand Up @@ -116,4 +105,4 @@ networks:
name: temporal-network

volumes:
redis-data:
redis-data:
55 changes: 41 additions & 14 deletions workflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (

"github.com/go-zoo/bone"
"github.com/gofrs/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/client"
"google.golang.org/grpc/codes"
)

func Api(c client.Client, r *Repository) http.Handler {
func Api(c client.Client) http.Handler {
mux := bone.New()

// API to launch an workflow. could be an incoming webhook,
Expand All @@ -34,7 +37,7 @@ func Api(c client.Client, r *Repository) http.Handler {
TaskQueue: PRCheckTaskQueue,
}

work, err := c.ExecuteWorkflow(context.Background(), options, (&CheckPR{}).CheckPR, details)
work, err := c.ExecuteWorkflow(context.Background(), options, CheckPR, details)
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte("couldn't enqueue"))
Expand All @@ -50,26 +53,50 @@ func Api(c client.Client, r *Repository) http.Handler {

// Get the status of a given job
mux.Get("/jobs/:id", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
ctx := req.Context()

id := bone.GetValue(req, "id")

rw.Header().Add("Content-Type", "application/json")
enc := json.NewEncoder(rw)

var s PRStatus
err := r.Get(req.Context(), id, &s)
switch err {
case nil:
rw.WriteHeader(http.StatusOK)
enc.Encode(&s)
case ErrNotFound:
// TODO: ask temporal here, in case it is PENDING.
// Does the requested workflow execution exist at all?
desc, err := c.DescribeWorkflowExecution(ctx, id, "")
switch {
case err == nil:
case serviceerror.ToStatus(err).Code() == codes.NotFound:
rw.WriteHeader(http.StatusNotFound)
enc.Encode("not found")
fmt.Fprint(rw, "{}")
return
default:
rw.WriteHeader(http.StatusInternalServerError)
enc.Encode("error")
fmt.Println(err)
fmt.Fprint(rw, "{}")
return
}

// You could check the execution "memo" here for
// thinks like an org id for AuthZ.

var out struct {
Status string `json:"status"`
}

// Figure out what our status is.
// As soon as the work is added to the queue, its status
// will be running (so "pending" won't work here).
switch desc.WorkflowExecutionInfo.Status {
case enums.WORKFLOW_EXECUTION_STATUS_COMPLETED:
out.Status = "completed"
case enums.WORKFLOW_EXECUTION_STATUS_RUNNING, enums.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
out.Status = "running"
case enums.WORKFLOW_EXECUTION_STATUS_TIMED_OUT:
out.Status = "timed_out"
default: // cancelled, errored, etc
out.Status = "errored"
}

rw.WriteHeader(http.StatusOK)
enc := json.NewEncoder(rw)
enc.Encode(out)
}))

// API to complete an activity.
Expand Down
2 changes: 2 additions & 0 deletions workflow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ require (
github.com/go-redis/redis/v8 v8.8.2
github.com/go-zoo/bone v1.3.0
github.com/gofrs/uuid v4.0.0+incompatible
go.temporal.io/api v1.4.1-0.20210420220407-6f00f7f98373
go.temporal.io/sdk v1.8.0
google.golang.org/grpc v1.37.0
)
12 changes: 5 additions & 7 deletions workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package main
import (
"log"
"net/http"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {

r := NewRepository()
log.Println(r.redis)

// Create the client object just once per process
c, err := client.NewClient(client.Options{
HostPort: "temporal:7233",
Expand All @@ -25,15 +22,16 @@ func main() {
// run our web server. this isn't clean for startup or shutdown but
// that's ok for now.
go func() {
err := http.ListenAndServe("0.0.0.0:6007", Api(c, r))
err := http.ListenAndServe("0.0.0.0:6007", Api(c))
log.Fatalln(err)
}()

time.Sleep(2 * time.Minute)

// This worker hosts both Worker and Activity functions
w := worker.New(c, PRCheckTaskQueue, worker.Options{})

cpr := &CheckPR{r: r}
w.RegisterWorkflow(cpr.CheckPR)
w.RegisterWorkflow(CheckPR)

w.RegisterActivity(Test)
w.RegisterActivity(DiffResults)
Expand Down
37 changes: 1 addition & 36 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,13 @@ type CheckDetails struct {
New string
}

type CheckPR struct {
r *Repository
}

func (cpr *CheckPR) CheckPR(ctx workflow.Context, details CheckDetails) error {
func CheckPR(ctx workflow.Context, details CheckDetails) error {
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: time.Minute, // Max 1 minute before quitting
})

ctx = workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: time.Minute, // Max 1 minute before quitting
})

i := workflow.GetInfo(ctx)

// TODO: put status in error on failure.
var status PRStatus

// Start both tests. We get back futures. They transparently handle
// retries and persisting the results.
status.Status = append([]PRStatusItem{{State: "testing", TimeStamp: workflow.Now(ctx), Description: "tests are running"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

old := workflow.ExecuteActivity(ctx, Test, details.Repo, details.Old)
new := workflow.ExecuteActivity(ctx, Test, details.Repo, details.New)

Expand All @@ -63,33 +45,16 @@ func (cpr *CheckPR) CheckPR(ctx workflow.Context, details CheckDetails) error {

// fast to do and deterministic. run inside the workflow.
var diff string
status.Status = append([]PRStatusItem{{State: "diffing", TimeStamp: workflow.Now(ctx), Description: "test results are diffing"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

err := workflow.ExecuteActivity(ctx, DiffResults, oldRes, newRes).Get(ctx, &diff)
if err != nil {
return err
}

// Resolve the final task and finish.
status.Status = append([]PRStatusItem{{State: "reporting", TimeStamp: workflow.Now(ctx), Description: "PR check results are being posted"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

if err := workflow.ExecuteActivity(ctx, SetCommitStatus, details.Repo, details.PR, diff).Get(ctx, nil); err != nil {
return err
}

// XXX: Does setting this final status make sense? how will query interact with the actual workflow completion status?
// is there a race condition? probably doesn't matter.
status.Status = append([]PRStatusItem{{State: "complete", TimeStamp: workflow.Now(ctx), Description: "All done"}}, status.Status...)
if err := workflow.ExecuteLocalActivity(ctx, cpr.r.Put, i.WorkflowExecution.ID, status).Get(ctx, nil); err != nil {
return err
}

return nil
}

Expand Down