Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a utility to perform a gradual split #116

Closed
wants to merge 12 commits into from
26 changes: 7 additions & 19 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,25 @@ options:
steps:
# Run unit tests for environment.
- name: gcr.io/$PROJECT_ID/golang-cbif:1.18
env:
- GONOPROXY=github.com/m-lab/go/*
args:
- go version
- go get -v -t ./...
- go vet ./...
- go test ./... -race
- go test -v ./...

# Deployment of APIs in sandbox & staging.
- name: gcr.io/$PROJECT_ID/gcloud-jsonnet-cbif:1.18
# Buil gradual-split command.
- name: gcr.io/$PROJECT_ID/golang-cbif:1.18
env:
# Use cbif condition: only run these steps in one of these projects.
- PROJECT_IN=mlab-sandbox,mlab-staging
- CGO_ENABLED=0
args:
- cp cloudbuild/app.yaml.template app.yaml
- >
sed -i
-e 's/{{PROJECT}}/$PROJECT_ID/g'
-e 's/{{PLATFORM_PROJECT}}/$_PLATFORM_PROJECT/'
-e 's/{{REDIS_ADDRESS}}/$_REDIS_ADDRESS/'
app.yaml
- gcloud --project $PROJECT_ID app deploy --promote app.yaml
# After deploying the new service, deploy the openapi spec.
- sed -i -e 's/{{PROJECT}}/$PROJECT_ID/' openapi.yaml
- gcloud endpoints services deploy openapi.yaml
- go build ./cmd/gradual-split

# Deployment of APIs in mlab-ns.
# Deployment of APIs in sandbox, staging, & mlab-ns.
- name: gcr.io/$PROJECT_ID/gcloud-jsonnet-cbif:1.18
env:
# Use cbif condition: only run these steps in one of these projects.
- PROJECT_IN=mlab-ns
- PROJECT_IN=mlab-sandbox,mlab-staging,mlab-ns
args:
- cp cloudbuild/app.yaml.template app.yaml
- >
Expand All @@ -49,6 +36,7 @@ steps:
-e 's/{{REDIS_ADDRESS}}/$_REDIS_ADDRESS/'
app.yaml
- gcloud --project $PROJECT_ID app deploy --no-promote app.yaml
- ./gradual-split -project $PROJECT_ID
# After deploying the new service, deploy the openapi spec.
- sed -i -e 's/{{PROJECT}}/$PROJECT_ID/' openapi.yaml
- gcloud endpoints services deploy openapi.yaml
121 changes: 121 additions & 0 deletions cmd/gradual-split/internal/split.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package internal

import (
"context"
"errors"
"fmt"
"sort"
"time"

appengine "google.golang.org/api/appengine/v1"
)

// Split represents traffic ratios "split" between two versions.
type Split struct {
From float64
To float64
}

// SplitOptions collects multiple values needed to perform the gradual split.
type SplitOptions struct {
From string
To string
Delay time.Duration
Sequence []Split
}

// AppWrapper provides fakeable access to the App Engine Admin API.
type AppWrapper interface {
VersionPages(ctx context.Context, serviceID string, f func(listVer *appengine.ListVersionsResponse) error) error
ServiceUpdate(ctx context.Context, serviceID string, service *appengine.Service, mask string) (*appengine.Operation, error)
}

// lookupLatestVersion returns the latest serving version string. The return
// value will be empty if verFrom is the latest version.
func lookupLatestVersion(ctx context.Context, api AppWrapper, verFrom string) (string, error) {
latest := ""
err := api.VersionPages(ctx, "locate", func(lv *appengine.ListVersionsResponse) error {
for _, version := range lv.Versions {
if version.ServingStatus != "SERVING" {
// We can only split to versions that are running.
continue
}
// Find the latest version.
if version.Id > latest {
latest = version.Id
}
}
return nil
})
if verFrom >= latest {
// Skip older versions.
return "", err
}
return latest, err
}

// GetVersions returns the current active and latest version. If vfrom and vto
// are provided, they can override the result.
func GetVersions(ctx context.Context, api AppWrapper, service *appengine.Service, vfrom, vto string) (string, string, error) {
switch {
case vfrom != "" && vto != "":
// Assume the source and target versions have been provided on the command line.
break
case len(service.Split.Allocations) == 1:
// Assume the split has not started.
for from := range service.Split.Allocations {
vfrom = from
break
}
if vto == "" {
var err error
vto, err = lookupLatestVersion(ctx, api, vfrom)
if err != nil {
return "", "", err
}
}
if vto == "" {
// There is no later version.
return vfrom, vfrom, nil
}
case len(service.Split.Allocations) == 2:
// Assume the split is already in progress.
versions := []string{}
for from := range service.Split.Allocations {
versions = append(versions, from)
}
sort.Strings(versions)
vfrom, vto = versions[0], versions[1]
default:
return "", "", errors.New("multi split not supported")
}
return vfrom, vto, nil
}

// PerformSplit applies the sequence of split options pausing by Delay after
// each step. PerformSplit can resume a split in progress.
func PerformSplit(ctx context.Context, api AppWrapper, service *appengine.Service, opt *SplitOptions) error {
// Check which split sequence position to start from. We can assume that
// vfrom will always be present in the currnet split Allocation.
for i := range opt.Sequence {
split := opt.Sequence[i]
if split.To <= service.Split.Allocations[opt.To] {
continue
}
fmt.Print("Splitting traffic from:", opt.From, split.From, "-> to:", opt.To, split.To, "")
service.Split.Allocations[opt.From] = split.From
service.Split.Allocations[opt.To] = split.To
if split.From == 0.0 {
// You cannot set a split percentage of zero.
delete(service.Split.Allocations, opt.From)
}
service.Split.ShardBy = "IP" // Make traffic sticky.
op, err := api.ServiceUpdate(ctx, "locate", service, "split")
if err != nil {
return fmt.Errorf("%v: failed to update service traffic split: %#v", err, op)
}
fmt.Println("sleeping", opt.Delay)
time.Sleep(opt.Delay)
}
return nil
}
193 changes: 193 additions & 0 deletions cmd/gradual-split/internal/split_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package internal

import (
"context"
"errors"
"testing"
"time"

appengine "google.golang.org/api/appengine/v1"
)

type fakeAPI struct {
versions []*appengine.Version
versionErr error
serviceErr error
}

func (a *fakeAPI) VersionPages(ctx context.Context, serviceID string,
f func(listVer *appengine.ListVersionsResponse) error) error {
lvs := &appengine.ListVersionsResponse{
Versions: a.versions,
}
if a.versionErr != nil {
return a.versionErr
}
return f(lvs)
}

func (a *fakeAPI) ServiceUpdate(ctx context.Context, serviceID string, service *appengine.Service, mask string) (*appengine.Operation, error) {
if a.serviceErr != nil {
return nil, a.serviceErr
}
return nil, nil
}

func Test_GetVersions(t *testing.T) {
tests := []struct {
name string
versions []*appengine.Version
allocations map[string]float64
from string
to string
wantFrom string
wantTo string
versionErr error
wantErr bool
}{
{
name: "success",
versions: []*appengine.Version{
&appengine.Version{ServingStatus: "SERVING", Id: "a"},
&appengine.Version{ServingStatus: "STOPPED", Id: "b"},
&appengine.Version{ServingStatus: "SERVING", Id: "c"},
},
allocations: map[string]float64{
"a": 1.0,
},
wantFrom: "a",
wantTo: "c",
},
{
name: "success",
allocations: map[string]float64{
"a": 1.0,
},
from: "a",
to: "c",
wantFrom: "a",
wantTo: "c",
},
{
name: "success",
versions: []*appengine.Version{
&appengine.Version{ServingStatus: "SERVING", Id: "a"},
},
allocations: map[string]float64{
"a": 1.0,
},
wantFrom: "a",
wantTo: "a",
},
{
name: "success",
allocations: map[string]float64{
"a": 0.5,
"b": 0.5,
},
wantFrom: "a",
wantTo: "b",
},
{
name: "error",
allocations: map[string]float64{
"a": 0.2,
"b": 0.3,
"c": 0.5,
},
wantErr: true,
},
{
name: "error",
versions: []*appengine.Version{
&appengine.Version{ServingStatus: "SERVING", Id: "a"},
},
allocations: map[string]float64{
"a": 1.0,
},
versionErr: errors.New("fake version error"),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
api := &fakeAPI{versions: tt.versions, versionErr: tt.versionErr}
service := &appengine.Service{
Split: &appengine.TrafficSplit{
Allocations: tt.allocations,
},
}
gotFrom, gotTo, err := GetVersions(ctx, api, service, tt.from, tt.to)
if (err != nil) != tt.wantErr {
t.Errorf("GetVersions() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotFrom != tt.wantFrom {
t.Errorf("GetVersions() got = %v, want %v", gotFrom, tt.wantFrom)
}
if gotTo != tt.wantTo {
t.Errorf("GetVersions() got1 = %v, want %v", gotTo, tt.wantTo)
}
})
}
}

func Test_PerformSplit(t *testing.T) {
tests := []struct {
name string
vfrom string
vto string
serviceErr error
wantErr bool
}{
{
name: "success",
vfrom: "a",
vto: "c",
},
{
name: "error",
serviceErr: errors.New("fake service error"),
wantErr: true,
},
}
delay := time.Millisecond
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
api := &fakeAPI{
versions: []*appengine.Version{
&appengine.Version{ServingStatus: "SERVING", Id: "a"},
&appengine.Version{ServingStatus: "SERVING", Id: "c"},
},
serviceErr: tt.serviceErr,
}
service := &appengine.Service{
Split: &appengine.TrafficSplit{
Allocations: map[string]float64{
"a": 0.5,
"c": 0.5,
},
},
}
opt := &SplitOptions{
From: tt.vfrom,
To: tt.vto,
Delay: delay,
Sequence: []Split{
{From: 0.90, To: 0.10}, // the biggest disruption appears to happen in the first step.
{From: 0.75, To: 0.25},
{From: 0.50, To: 0.50},
{From: 0.25, To: 0.75},
{From: 0.00, To: 1.00},
},
}

err := PerformSplit(ctx, api, service, opt)
if (err != nil) != tt.wantErr {
t.Errorf("PerformSplit() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
Loading