Skip to content

Commit

Permalink
Merge pull request #76 from vimeo/delay_verify_mode
Browse files Browse the repository at this point in the history
dials: add DelayInitialVerification mode
  • Loading branch information
dfinkel authored Mar 3, 2023
2 parents 55c97d4 + 914ed46 commit 87bcd97
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 13 deletions.
3 changes: 2 additions & 1 deletion cb_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type userCallbackEvent interface {
type newConfigEvent[T any] struct {
oldConfig, newConfig *T
serial uint64
globalCBsSuppressed bool
}

func (*newConfigEvent[T]) isUserCallbackEvent() {}
Expand Down Expand Up @@ -75,7 +76,7 @@ func (cbm *callbackMgr[T]) runCBs(ctx context.Context) {
case *newConfigEvent[T]:
lastSerial = e.serial
lastVersion = e.newConfig
if cbm.p.OnNewConfig != nil {
if cbm.p.OnNewConfig != nil && !e.globalCBsSuppressed {
cbm.p.OnNewConfig(ctx, e.oldConfig, e.newConfig)
}
for _, cbh := range newCfgCBs {
Expand Down
153 changes: 141 additions & 12 deletions dials.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ type Params[T any] struct {
OnWatchedError WatchedErrorHandler[T]

// SkipInitialVerification skips the initial call to `Verify()` on any
// configurations that implement the `VerifiedConfig` interface.
// configurations that implement the [VerifiedConfig] interface.
//
// In cases where later updates from Watching sources are depended upon to
// provide a configuration that will be allowed by Verify(), one should set
// this to true. See `sourcewrap.Blank` for more details.
//
// Unlike DelayInitialVerification, this field only skips the initial Verify()
// call, so all watching sources (including Blank) trigger configuration
// verification.
SkipInitialVerification bool

// OnNewConfig is called when a new (valid) configuration is installed.
Expand All @@ -46,6 +50,26 @@ type Params[T any] struct {
// In the event that a call to OnNewConfig blocks too long, some calls
// may be dropped.
OnNewConfig NewConfigHandler[T]

// DelayInitialVerification skips calls to Verify() until the EnableVerification()
// method is called.
//
// Some systems require coalescing the data from multiple Sources, which require
// initialization parameters from other sources (e.g. filenames).
//
// Notably, many use-cases involving sourcewrap.Blank may require multiple steps
// to initialize, during which time the configuration will be incomplete and may
// not validate.
DelayInitialVerification bool

// CallGlobalCallbacksAfterVerificationEnabled suppresses calling any registered
// global while in the delayed-verification mode.
//
// In particular, global callbacks (those registered in this struct) are
// suppressed under two conditions:
// - DelayInitialVerification was set to true when Config was called
// - EnableVerification has not been called (without it returning an error)
CallGlobalCallbacksAfterVerificationEnabled bool
}

// Config populates the passed in config struct by reading the values from the
Expand All @@ -55,7 +79,7 @@ type Params[T any] struct {
//
// If present, a Verify() method will be called after each stacking attempt.
// Blocking/expensive work should not be done in this method. (see the comment
// on Verify()) in VerifiedConfig for details)
// on Verify()) in [VerifiedConfig] for details)
//
// If complicated/blocking initialization/verification is necessary, one can either:
// - If not using any watching sources, do any verification with the returned
Expand Down Expand Up @@ -121,7 +145,7 @@ func (p Params[T]) Config(ctx context.Context, t *T, sources ...Source) (*Dials[
d.value.Store(&versionedConfig[T]{serial: 0, cfg: nv})

// Verify that the configuration is valid if a Verify() method is present.
if vf, ok := newValue.(VerifiedConfig); ok && !p.SkipInitialVerification {
if vf, ok := newValue.(VerifiedConfig); ok && !p.SkipInitialVerification && !p.DelayInitialVerification {
if vfErr := vf.Verify(); vfErr != nil {
return nil, fmt.Errorf("initial configuration verification failed: %w", vfErr)
}
Expand All @@ -139,7 +163,10 @@ func (p Params[T]) Config(ctx context.Context, t *T, sources ...Source) (*Dials[
ch: cbch,
}
go cbmgr.runCBs(ctx)
go d.monitor(ctx, tVal.Interface().(*T), computed, watcherChan)

monCtl := make(chan verifyEnable[T], 3)
d.monCtl = monCtl
go d.monitor(ctx, tVal.Interface().(*T), computed, watcherChan, monCtl)
}
return d, nil
}
Expand Down Expand Up @@ -403,6 +430,7 @@ func (d *Dials[T]) RegisterCallback(ctx context.Context, serial CfgSerial[T], cb
func (d *Dials[T]) updateSourceValue(
ctx context.Context,
t *T,
skipVerify bool,
sourceValues []sourceValue,
watchTab *valueUpdate,
) *T {
Expand All @@ -426,7 +454,7 @@ func (d *Dials[T]) updateSourceValue(
}

// Verify that the configuration is valid if a Verify() method is present.
if vf, ok := newInterface.(VerifiedConfig); ok {
if vf, ok := newInterface.(VerifiedConfig); ok && !skipVerify {
if vfErr := vf.Verify(); vfErr != nil {
oldVal := d.View()

Expand Down Expand Up @@ -513,36 +541,137 @@ func (d *Dials[T]) submitEvent(ctx context.Context, ev userCallbackEvent) {
}
}

type verifyEnableResp[T any] struct {
// only one of error or cfgTok will be returned
err error
v *T
tok CfgSerial[T]
}

// verifyEnable is the payload type for the monCtl channel used to signal that
// verification should be enabled.
type verifyEnable[T any] struct {
// resp must have capacity 1
resp chan<- verifyEnableResp[T]
}

// EnableVerification enables verification on dials if DelayInitialVerification was set on
// the [Params] struct. Returns the config that was verified and a [CfgSerial] or the
// error from calling Verify() (if the config type implements [VerifiedConfig].
//
// If DelayInitialVerification is not set, returns successfully without verifying the
// config.
//
// If verification succeeds on the currently installed configuration, all subsequent
// configuration versions will be verified. (based on re-stacking versions from watching
// sources)
//
// When there are watching sources (including Blank) the global callbacks may
// be suppressed with the [Params].CallGlobalCallbacksAfterVerificationEnabled option.
// This suppression expires after verification is re-enabled by this method.
//
// Note: if the context expires while this call is awaiting a response from the background
// "monitor" goroutine, verification may still happen, but whether it transitions out of
// the delayed verification state is indeterminate.
func (d *Dials[T]) EnableVerification(ctx context.Context) (*T, CfgSerial[T], error) {
if !d.params.DelayInitialVerification {
// this is a noop, since we never disabled verification
cfg, tok := d.ViewVersion()
return cfg, tok, nil
} else if d.monCtl == nil {
cfg, tok := d.ViewVersion()
if vc, ok := any(cfg).(VerifiedConfig); ok {
return nil, CfgSerial[T]{}, vc.Verify()
}
return cfg, tok, nil
}
// must have capacity 1
resp := make(chan verifyEnableResp[T], 1)
select {
case d.monCtl <- verifyEnable[T]{resp: resp}:
case <-ctx.Done():
return nil, CfgSerial[T]{}, fmt.Errorf("context expired while signaling: %w", ctx.Err())
}

select {
case r := <-resp:
return r.v, r.tok, r.err
case <-ctx.Done():
return nil, CfgSerial[T]{}, fmt.Errorf("context expired while awaiting response: %w", ctx.Err())
}

}

func (d *Dials[T]) monitorEnableVerify(ve verifyEnable[T]) bool {
vt, serial := d.ViewVersion()
if vf, ok := any(vt).(VerifiedConfig); ok {
if vfErr := vf.Verify(); vfErr != nil {
ve.resp <- verifyEnableResp[T]{
err: vfErr,
v: nil,
tok: CfgSerial[T]{},
}

return false
}
}
ve.resp <- verifyEnableResp[T]{
err: nil,
v: vt,
tok: serial,
}
return true
}

func (d *Dials[T]) monitor(
ctx context.Context,
t *T,
sourceValues []sourceValue,
watcherChan chan watchStatusUpdate,
monCtl <-chan verifyEnable[T],
) {
defer close(d.cbch)
skipVerify := d.params.DelayInitialVerification
for {
select {
case <-ctx.Done():
return
case v := <-monCtl:
if !skipVerify {
// we're not in skipVerify mode, so just send back
// a success and continue
cfg, serial := d.ViewVersion()
v.resp <- verifyEnableResp[T]{
err: nil,
v: cfg,
tok: serial,
}
continue
}
skipVerify = !d.monitorEnableVerify(v)
case watchTab := <-watcherChan:
switch v := watchTab.(type) {
case *valueUpdate:
oldConfig, oldSerial := d.ViewVersion()
newConfig := d.updateSourceValue(ctx, t, sourceValues, v)
newConfig := d.updateSourceValue(ctx, t, skipVerify, sourceValues, v)
if newConfig != nil {
d.submitEvent(ctx, &newConfigEvent[T]{
oldConfig: oldConfig,
newConfig: newConfig,
serial: oldSerial.s + 1,
globalCBsSuppressed: skipVerify &&
d.params.CallGlobalCallbacksAfterVerificationEnabled,
})
}
case *watchErrorReport:
d.submitEvent(ctx, &watchErrorEvent[T]{
err: fmt.Errorf("error reported by source of type %T: %w",
v.source, v.err),
oldConfig: d.View(),
newConfig: nil,
})
if !skipVerify && !d.params.CallGlobalCallbacksAfterVerificationEnabled {
d.submitEvent(ctx, &watchErrorEvent[T]{
err: fmt.Errorf("error reported by source of type %T: %w",
v.source, v.err),
oldConfig: d.View(),
newConfig: nil,
})
}
case *watcherDone:
if !d.markSourceDone(ctx, sourceValues, v) {
// if there are no watching sources, just exit.
Expand Down
1 change: 1 addition & 0 deletions dials_118.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Dials[T any] struct {
updatesChan chan *T
params Params[T]
cbch chan<- userCallbackEvent
monCtl chan<- verifyEnable[T]
}

// View returns the configuration struct populated.
Expand Down
1 change: 1 addition & 0 deletions dials_119.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Dials[T any] struct {
updatesChan chan *T
params Params[T]
cbch chan<- userCallbackEvent
monCtl chan<- verifyEnable[T]
}

// View returns the configuration struct populated.
Expand Down
Loading

0 comments on commit 87bcd97

Please sign in to comment.