Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Changelog

## [Unreleased]

### Removed

- `Worker.Config` field removed in favor of [`config.LoadFromContext`](https://pkg.go.dev/github.com/go-nacelle/config/v3#LoadFromContext).
- `Worker.Services` field removed in favor of [`service.FromContext`](https://pkg.go.dev/github.com/go-nacelle/service/v2#FromContext).
- `Worker.Health` field removed in favor of [`process.HealthFromContext`](https://pkg.go.dev/github.com/go-nacelle/process/v2#HealthFromContext).

### Changed

- `WorkerSpec.Init` parameter changed to `context.Context` to match Nacelle. [#3](https://github.com/go-nacelle/workerbase/pull/3)
- Update dependency [go-nacelle/nacelle@v1.0.2] -> [go-nacelle/nacelle@v2.1.0]

[unreleased]: https://github.com/go-nacelle/workerbase/compare/v1.2.0...HEAD
[go-nacelle/nacelle@v1.0.2]: https://github.com/go-nacelle/nacelle/releases/tag/v1.0.2
[go-nacelle/nacelle@v2.1.0]: https://github.com/go-nacelle/nacelle/releases/tag/v2.1.0

12 changes: 6 additions & 6 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/derision-test/glock"
"github.com/go-nacelle/config/v3"
"github.com/go-nacelle/nacelle/v2"
"github.com/go-nacelle/process/v2"
"github.com/go-nacelle/service/v2"
Expand All @@ -14,9 +15,6 @@ import (

type (
Worker struct {
Config *nacelle.Config `service:"config"`
Services *nacelle.ServiceContainer `service:"services"`
Health *nacelle.Health `service:"health"`
tagModifiers []nacelle.TagModifier
spec WorkerSpec
clock glock.Clock
Expand Down Expand Up @@ -59,21 +57,23 @@ func newWorker(spec WorkerSpec, clock glock.Clock, configs ...ConfigFunc) *Worke
}

func (w *Worker) Init(ctx context.Context) error {
healthStatus, err := w.Health.Register(w.healthToken)
health := process.HealthFromContext(ctx)
healthStatus, err := health.Register(w.healthToken)
if err != nil {
return err
}
w.healthStatus = healthStatus

workerConfig := &Config{}
if err := w.Config.Load(workerConfig, w.tagModifiers...); err != nil {
if err := config.LoadFromContext(ctx, workerConfig, w.tagModifiers...); err != nil {
return err
}

w.strictClock = workerConfig.StrictClock
w.tickInterval = workerConfig.WorkerTickInterval

if err := service.Inject(ctx, w.Services, w.spec); err != nil {
svc := service.FromContext(ctx)
if err := service.Inject(ctx, svc, w.spec); err != nil {
return err
}

Expand Down
128 changes: 74 additions & 54 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (

"github.com/derision-test/glock"
mockassert "github.com/derision-test/go-mockgen/testutil/assert"
"github.com/go-nacelle/config/v3"
"github.com/go-nacelle/nacelle/v2"
"github.com/go-nacelle/process/v2"
"github.com/go-nacelle/service/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -25,7 +28,7 @@ func TestRunAndStop(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
tickChan = make(chan struct{})
errChan = make(chan error)
)
Expand All @@ -36,9 +39,10 @@ func TestRunAndStop(t *testing.T) {
tickChan <- struct{}{}
return nil
})
worker.Config = testConfig

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.Nil(t, err)

Expand All @@ -63,7 +67,7 @@ func TestNonStrict(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
errChan = make(chan error)
)

Expand All @@ -87,11 +91,14 @@ func TestNonStrict(t *testing.T) {
clock.Advance(time.Second * 30)
return nil
})
worker.Config = nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"worker_tick_interval": "60",
}))

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx,
nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"worker_tick_interval": "60",
})),
)

err := worker.Init(ctx)

assert.Nil(t, err)
Expand Down Expand Up @@ -127,7 +134,7 @@ func TestStrict(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
errChan = make(chan error)
)

Expand Down Expand Up @@ -167,14 +174,16 @@ func TestStrict(t *testing.T) {
clock.Advance(d)
return nil
})
worker.Config = nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"worker_tick_interval": "60",
"worker_strict_clock": "true",
}))

ctx := context.Background()
err := worker.Init(ctx)
ctx := newTestContext()
ctx = config.WithConfig(ctx,
nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"worker_tick_interval": "60",
"worker_strict_clock": "true",
})),
)

err := worker.Init(ctx)
assert.Nil(t, err)

go func() {
Expand Down Expand Up @@ -204,27 +213,30 @@ func TestStrict(t *testing.T) {

func TestBadInject(t *testing.T) {
worker := NewWorker(&badInjectWorkerSpec{})
worker.Services = makeBadContainer()
worker.Health = nacelle.NewHealth()
worker.Config = testConfig

ctx := context.Background()
ctx = service.WithContainer(ctx, makeBadContainer())
ctx = process.ContextWithHealth(ctx, nacelle.NewHealth())
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "ServiceA")
}

func TestTagModifiers(t *testing.T) {
worker := NewWorker(NewMockWorkerSpecFinalizer(), WithTagModifiers(nacelle.NewEnvTagPrefixer("prefix")))
worker.Services = nacelle.NewServiceContainer()
worker.Health = nacelle.NewHealth()
worker.Config = nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"prefix_worker_tick_interval": "3600",
}))

ctx := context.Background()
err := worker.Init(ctx)
ctx := newTestContext()
ctx = service.WithContainer(ctx, nacelle.NewServiceContainer())
ctx = process.ContextWithHealth(ctx, nacelle.NewHealth())
ctx = config.WithConfig(ctx,
nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"prefix_worker_tick_interval": "3600",
})),
)

err := worker.Init(ctx)
assert.Nil(t, err)
assert.Equal(t, time.Hour, worker.tickInterval)
}
Expand All @@ -233,15 +245,17 @@ func TestInitConfig(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
)

worker.Config = nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"worker_tick_interval": "60",
"worker_strict_clock": "true",
}))
ctx := newTestContext()
ctx = config.WithConfig(ctx,
nacelle.NewConfig(nacelle.NewTestEnvSourcer(map[string]string{
"worker_tick_interval": "60",
"worker_strict_clock": "true",
})),
)

ctx := context.Background()
err := worker.Init(ctx)
require.Nil(t, err)

Expand All @@ -252,15 +266,16 @@ func TestInitConfig(t *testing.T) {
func TestInitError(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
worker = makeWorker(spec, glock.NewRealClock())
worker = newWorker(spec, glock.NewRealClock())
)

spec.InitFunc.SetDefaultHook(func(ctx context.Context) error {
return fmt.Errorf("oops")
})
worker.Config = testConfig

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.EqualError(t, err, "oops")
}
Expand All @@ -269,12 +284,13 @@ func TestFinalize(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
errChan = make(chan error)
)
worker.Config = testConfig

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.Nil(t, err)

Expand All @@ -292,16 +308,17 @@ func TestFinalizeError(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
errChan = make(chan error)
)

spec.FinalizeFunc.SetDefaultHook(func(ctx context.Context) error {
return fmt.Errorf("oops")
})
worker.Config = testConfig

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.Nil(t, err)

Expand All @@ -319,7 +336,7 @@ func TestFinalizeErrorDoesNotOverwrite(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
errChan = make(chan error)
)

Expand All @@ -330,9 +347,10 @@ func TestFinalizeErrorDoesNotOverwrite(t *testing.T) {
spec.FinalizeFunc.SetDefaultHook(func(ctx context.Context) error {
return fmt.Errorf("unheard oops")
})
worker.Config = testConfig

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.Nil(t, err)

Expand All @@ -350,16 +368,17 @@ func TestTickError(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
errChan = make(chan error)
)

spec.TickFunc.SetDefaultHook(func(ctx context.Context) error {
return fmt.Errorf("oops")
})
worker.Config = testConfig

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.Nil(t, err)

Expand All @@ -375,17 +394,18 @@ func TestTickContext(t *testing.T) {
var (
spec = NewMockWorkerSpecFinalizer()
clock = glock.NewMockClock()
worker = makeWorker(spec, clock)
worker = newWorker(spec, clock)
errChan = make(chan error)
)

spec.TickFunc.SetDefaultHook(func(ctx context.Context) error {
<-ctx.Done()
return nil
})
worker.Config = testConfig

ctx := context.Background()
ctx := newTestContext()
ctx = config.WithConfig(ctx, testConfig)

err := worker.Init(ctx)
assert.Nil(t, err)

Expand All @@ -398,11 +418,11 @@ func TestTickContext(t *testing.T) {
assert.Nil(t, value)
}

func makeWorker(spec WorkerSpec, clock glock.Clock) *Worker {
worker := newWorker(spec, clock)
worker.Services = nacelle.NewServiceContainer()
worker.Health = nacelle.NewHealth()
return worker
func newTestContext() context.Context {
ctx := context.Background()
ctx = process.ContextWithHealth(ctx, process.NewHealth())

return ctx
}

//
Expand Down