Skip to content

Commit 5862a9e

Browse files
ishanarya0Ishan Arya
and
Ishan Arya
authored
feat: add metrics for res status change (#126)
* poc * feat: add metrics for res status change * chore: remove unused variables * refactor: otel setup * chore: clear opencencus * fix: add runtime metrics * test: fix * feat: add mod file --------- Co-authored-by: Ishan Arya <[email protected]>
1 parent 953741b commit 5862a9e

13 files changed

+288
-378
lines changed

cli/serve.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func StartServer(ctx context.Context, cfg Config, migrate, spawnWorker bool) err
6464

6565
store := setupStorage(cfg.PGConnStr, cfg.Syncer, cfg.Service)
6666
moduleService := module.NewService(setupRegistry(), store)
67-
resourceService := core.New(store, moduleService, time.Now, cfg.Syncer.SyncBackoffInterval, cfg.Syncer.MaxRetries)
67+
resourceService := core.New(store, moduleService, time.Now, cfg.Syncer.SyncBackoffInterval, cfg.Syncer.MaxRetries, cfg.Telemetry.ServiceName)
6868

6969
if migrate {
7070
if migrateErr := runMigrations(ctx, cfg); migrateErr != nil {

cli/worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func cmdWorker() *cobra.Command {
5858
func StartWorkers(ctx context.Context, cfg Config) error {
5959
store := setupStorage(cfg.PGConnStr, cfg.Syncer, cfg.Service)
6060
moduleService := module.NewService(setupRegistry(), store)
61-
resourceService := core.New(store, moduleService, time.Now, cfg.Syncer.SyncBackoffInterval, cfg.Syncer.MaxRetries)
61+
resourceService := core.New(store, moduleService, time.Now, cfg.Syncer.SyncBackoffInterval, cfg.Syncer.MaxRetries, cfg.Telemetry.ServiceName)
6262

6363
eg := &errgroup.Group{}
6464
spawnWorkers(ctx, resourceService, cfg.Syncer.Workers, cfg.Syncer.SyncInterval, eg)

core/core.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Service struct {
1818
moduleSvc ModuleService
1919
syncBackoff time.Duration
2020
maxSyncRetries int
21+
serviceName string
2122
}
2223

2324
type ModuleService interface {
@@ -27,7 +28,7 @@ type ModuleService interface {
2728
GetOutput(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error)
2829
}
2930

30-
func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time, syncBackoffInterval time.Duration, maxRetries int) *Service {
31+
func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time, syncBackoffInterval time.Duration, maxRetries int, serviceName string) *Service {
3132
if clockFn == nil {
3233
clockFn = time.Now
3334
}
@@ -38,6 +39,7 @@ func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time,
3839
syncBackoff: syncBackoffInterval,
3940
maxSyncRetries: maxRetries,
4041
moduleSvc: moduleSvc,
42+
serviceName: serviceName,
4143
}
4244
}
4345

core/core_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ var (
2525

2626
func TestNew(t *testing.T) {
2727
t.Parallel()
28-
s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock, defaultSyncBackoff, defaultMaxRetries)
28+
s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock, defaultSyncBackoff, defaultMaxRetries, serviceName)
2929
assert.NotNil(t, s)
3030
}

core/read_test.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
const (
1818
defaultMaxRetries = 5
1919
defaultSyncBackoff = 5 * time.Second
20+
serviceName = "test-service"
2021
)
2122

2223
func TestService_GetResource(t *testing.T) {
@@ -38,7 +39,7 @@ func TestService_GetResource(t *testing.T) {
3839
GetByURN(mock.Anything, mock.Anything).
3940
Return(nil, errors.ErrNotFound).
4041
Once()
41-
return core.New(repo, nil, nil, defaultSyncBackoff, defaultMaxRetries)
42+
return core.New(repo, nil, nil, defaultSyncBackoff, defaultMaxRetries, serviceName)
4243
},
4344
urn: "foo:bar:baz",
4445
wantErr: errors.ErrNotFound,
@@ -58,7 +59,7 @@ func TestService_GetResource(t *testing.T) {
5859
Return(nil, nil).
5960
Once()
6061

61-
return core.New(repo, mod, deadClock, defaultSyncBackoff, defaultMaxRetries)
62+
return core.New(repo, mod, deadClock, defaultSyncBackoff, defaultMaxRetries, serviceName)
6263
},
6364
urn: "foo:bar:baz",
6465
want: &sampleResource,
@@ -105,7 +106,7 @@ func TestService_ListResources(t *testing.T) {
105106
List(mock.Anything, mock.Anything, false).
106107
Return(nil, nil).
107108
Once()
108-
return core.New(repo, nil, deadClock, defaultSyncBackoff, defaultMaxRetries)
109+
return core.New(repo, nil, deadClock, defaultSyncBackoff, defaultMaxRetries, serviceName)
109110
},
110111
want: resource.PagedResource{},
111112
wantErr: nil,
@@ -119,7 +120,7 @@ func TestService_ListResources(t *testing.T) {
119120
List(mock.Anything, mock.Anything, false).
120121
Return(nil, errStoreFailure).
121122
Once()
122-
return core.New(repo, nil, deadClock, defaultSyncBackoff, defaultMaxRetries)
123+
return core.New(repo, nil, deadClock, defaultSyncBackoff, defaultMaxRetries, serviceName)
123124
},
124125
want: resource.PagedResource{},
125126
wantErr: errors.ErrInternal,
@@ -133,7 +134,7 @@ func TestService_ListResources(t *testing.T) {
133134
List(mock.Anything, mock.Anything, false).
134135
Return([]resource.Resource{sampleResource}, nil).
135136
Once()
136-
return core.New(repo, nil, deadClock, defaultSyncBackoff, defaultMaxRetries)
137+
return core.New(repo, nil, deadClock, defaultSyncBackoff, defaultMaxRetries, serviceName)
137138
},
138139
want: resource.PagedResource{
139140
Count: 1,

core/sync.go

+48
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,26 @@ package core
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

8+
"go.opentelemetry.io/otel/metric"
79
"go.uber.org/zap"
810
"golang.org/x/sync/errgroup"
911

1012
"github.com/goto/entropy/core/resource"
1113
"github.com/goto/entropy/pkg/errors"
14+
"github.com/goto/entropy/pkg/telemetry"
15+
"go.opentelemetry.io/otel/attribute"
16+
)
17+
18+
type SyncStatus string
19+
20+
const (
21+
PendingCounter SyncStatus = "pending"
22+
CompletedCounter SyncStatus = "completed"
23+
ErrorCounter SyncStatus = "error"
24+
RetryCounter SyncStatus = "retry"
1225
)
1326

1427
// RunSyncer runs the syncer thread that keeps performing resource-sync at
@@ -44,6 +57,20 @@ func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*res
4457
zap.String("last_err", res.State.SyncResult.LastError),
4558
)
4659

60+
meter := telemetry.GetMeter(svc.serviceName)
61+
retryCounter, err := setupCounter(meter, RetryCounter)
62+
if err != nil {
63+
return nil, err
64+
}
65+
errorCounter, err := setupCounter(meter, ErrorCounter)
66+
if err != nil {
67+
return nil, err
68+
}
69+
completedCounter, err := setupCounter(meter, CompletedCounter)
70+
if err != nil {
71+
return nil, err
72+
}
73+
4774
modSpec, err := svc.generateModuleSpec(ctx, res)
4875
if err != nil {
4976
logEntry.Error("SyncOne() failed", zap.Error(err))
@@ -56,16 +83,26 @@ func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*res
5683

5784
res.State.SyncResult.LastError = err.Error()
5885
res.State.SyncResult.Retries++
86+
87+
// Increment the retry counter.
88+
retryCounter.Add(context.Background(), 1, metric.WithAttributes(attribute.String("resource", res.URN)))
89+
5990
if errors.Is(err, errors.ErrInvalid) {
6091
// ErrInvalid is expected to be returned when config is invalid.
6192
// There is no point in retrying in this case.
6293
res.State.Status = resource.StatusError
6394
res.State.NextSyncAt = nil
95+
96+
// Increment the error counter.
97+
errorCounter.Add(context.Background(), 1)
6498
} else if svc.maxSyncRetries > 0 && res.State.SyncResult.Retries >= svc.maxSyncRetries {
6599
// Some other error occurred and no more retries remaining.
66100
// move the resource to failure state.
67101
res.State.Status = resource.StatusError
68102
res.State.NextSyncAt = nil
103+
104+
// Increment the error counter.
105+
errorCounter.Add(context.Background(), 1)
69106
} else {
70107
// Some other error occurred and we still have remaining retries.
71108
// need to backoff and retry in some time.
@@ -78,6 +115,9 @@ func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*res
78115
res.UpdatedAt = svc.clock()
79116
res.State = *newState
80117

118+
// Increment the completed counter.
119+
completedCounter.Add(context.Background(), 1)
120+
81121
logEntry.Info("SyncOne() finished",
82122
zap.String("final_status", res.State.Status),
83123
zap.Timep("next_sync", res.State.NextSyncAt),
@@ -86,3 +126,11 @@ func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*res
86126

87127
return &res, nil
88128
}
129+
130+
func setupCounter(meter metric.Meter, countername SyncStatus) (metric.Int64Counter, error) {
131+
return meter.Int64Counter(
132+
fmt.Sprintf("%s_counter", countername),
133+
metric.WithDescription(fmt.Sprintf("Total number of %s performed", countername)),
134+
metric.WithUnit("1"),
135+
)
136+
}

core/write.go

+11
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/goto/entropy/core/module"
88
"github.com/goto/entropy/core/resource"
99
"github.com/goto/entropy/pkg/errors"
10+
"github.com/goto/entropy/pkg/telemetry"
1011
)
1112

1213
type Options struct {
@@ -101,6 +102,16 @@ func (svc *Service) execAction(ctx context.Context, res resource.Resource, act m
101102
return nil, err
102103
}
103104
}
105+
106+
meter := telemetry.GetMeter(svc.serviceName)
107+
pendingCounter, err := setupCounter(meter, PendingCounter)
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
// Increment the pending counter.
113+
pendingCounter.Add(context.Background(), 1)
114+
104115
return planned, nil
105116
}
106117

0 commit comments

Comments
 (0)