Skip to content

Commit 807a3cd

Browse files
committed
refactor: all network merge controllers
They had lots of common (and broken) code, so use generics to implement the core logic once, and use that in all kinds of merge controllers. This removes lots of code duplication. Fixes flaky unit-tests, and also improves controller performance on conflicts. Signed-off-by: Andrey Smirnov <[email protected]>
1 parent ec8c466 commit 807a3cd

16 files changed

+328
-793
lines changed

internal/app/machined/pkg/controllers/network/address_merge.go

Lines changed: 21 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -2,138 +2,41 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
44

5-
// Package network provides controllers which manage network resources.
6-
//
7-
//nolint:dupl
85
package network
96

107
import (
11-
"context"
12-
"fmt"
13-
148
"github.com/cosi-project/runtime/pkg/controller"
159
"github.com/cosi-project/runtime/pkg/resource"
1610
"github.com/cosi-project/runtime/pkg/safe"
17-
"github.com/cosi-project/runtime/pkg/state"
1811
"go.uber.org/zap"
1912

2013
"github.com/siderolabs/talos/pkg/machinery/resources/network"
2114
)
2215

23-
// AddressMergeController merges network.AddressSpec in network.ConfigNamespace and produces final network.AddressSpec in network.Namespace.
24-
type AddressMergeController struct{}
25-
26-
// Name implements controller.Controller interface.
27-
func (ctrl *AddressMergeController) Name() string {
28-
return "network.AddressMergeController"
29-
}
30-
31-
// Inputs implements controller.Controller interface.
32-
func (ctrl *AddressMergeController) Inputs() []controller.Input {
33-
return []controller.Input{
34-
{
35-
Namespace: network.ConfigNamespaceName,
36-
Type: network.AddressSpecType,
37-
Kind: controller.InputWeak,
38-
},
39-
{
40-
Namespace: network.NamespaceName,
41-
Type: network.AddressSpecType,
42-
Kind: controller.InputDestroyReady,
43-
},
44-
}
45-
}
46-
47-
// Outputs implements controller.Controller interface.
48-
func (ctrl *AddressMergeController) Outputs() []controller.Output {
49-
return []controller.Output{
50-
{
51-
Type: network.AddressSpecType,
52-
Kind: controller.OutputShared,
53-
},
54-
}
55-
}
56-
57-
// Run implements controller.Controller interface.
16+
// NewAddressMergeController initializes a AddressMergeController.
5817
//
59-
//nolint:gocyclo
60-
func (ctrl *AddressMergeController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
61-
for {
62-
select {
63-
case <-ctx.Done():
64-
return nil
65-
case <-r.EventCh():
66-
}
67-
68-
// list source network configuration resources
69-
list, err := r.List(ctx, resource.NewMetadata(network.ConfigNamespaceName, network.AddressSpecType, "", resource.VersionUndefined))
70-
if err != nil {
71-
return fmt.Errorf("error listing source network addresses: %w", err)
72-
}
73-
74-
// address is allowed as long as it's not duplicate, for duplicate higher layer takes precedence
75-
addresses := map[string]*network.AddressSpec{}
76-
77-
for _, res := range list.Items {
78-
address := res.(*network.AddressSpec) //nolint:forcetypeassert
79-
id := network.AddressID(address.TypedSpec().LinkName, address.TypedSpec().Address)
80-
81-
existing, ok := addresses[id]
82-
if ok && existing.TypedSpec().ConfigLayer > address.TypedSpec().ConfigLayer {
83-
// skip this address, as existing one is higher layer
84-
continue
85-
}
86-
87-
addresses[id] = address
88-
}
89-
90-
conflictsDetected := 0
91-
92-
for id, address := range addresses {
93-
if err = safe.WriterModify(ctx, r, network.NewAddressSpec(network.NamespaceName, id), func(addr *network.AddressSpec) error {
94-
*addr.TypedSpec() = *address.TypedSpec()
95-
96-
return nil
97-
}); err != nil {
98-
if state.IsPhaseConflictError(err) {
99-
// phase conflict, resource is being torn down, skip updating it and trigger reconcile
100-
// later by failing the
101-
conflictsDetected++
102-
103-
delete(addresses, id)
104-
} else {
105-
return fmt.Errorf("error updating resource: %w", err)
106-
}
107-
}
108-
}
109-
110-
// list addresses for cleanup
111-
list, err = r.List(ctx, resource.NewMetadata(network.NamespaceName, network.AddressSpecType, "", resource.VersionUndefined))
112-
if err != nil {
113-
return fmt.Errorf("error listing resources: %w", err)
114-
}
115-
116-
for _, res := range list.Items {
117-
if _, ok := addresses[res.Metadata().ID()]; !ok {
118-
var okToDestroy bool
119-
120-
okToDestroy, err = r.Teardown(ctx, res.Metadata())
121-
if err != nil {
122-
return fmt.Errorf("error cleaning up addresses: %w", err)
18+
// AddressMergeController merges network.AddressSpec in network.ConfigNamespace and produces final network.AddressSpec in network.Namespace.
19+
func NewAddressMergeController() controller.Controller {
20+
return GenericMergeController(
21+
network.ConfigNamespaceName,
22+
network.NamespaceName,
23+
func(logger *zap.Logger, list safe.List[*network.AddressSpec]) map[resource.ID]*network.AddressSpecSpec {
24+
// address is allowed as long as it's not duplicate, for duplicate higher layer takes precedence
25+
addresses := map[resource.ID]*network.AddressSpecSpec{}
26+
27+
for address := range list.All() {
28+
id := network.AddressID(address.TypedSpec().LinkName, address.TypedSpec().Address)
29+
30+
existing, ok := addresses[id]
31+
if ok && existing.ConfigLayer > address.TypedSpec().ConfigLayer {
32+
// skip this address, as existing one is higher layer
33+
continue
12334
}
12435

125-
if okToDestroy {
126-
if err = r.Destroy(ctx, res.Metadata()); err != nil {
127-
return fmt.Errorf("error cleaning up addresses: %w", err)
128-
}
129-
}
36+
addresses[id] = address.TypedSpec()
13037
}
131-
}
13238

133-
if conflictsDetected > 0 {
134-
return fmt.Errorf("%d conflict(s) detected", conflictsDetected)
135-
}
136-
137-
r.ResetRestartBackoff()
138-
}
39+
return addresses
40+
},
41+
)
13942
}

internal/app/machined/pkg/controllers/network/address_merge_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (suite *AddressMergeSuite) SetupTest() {
5252
suite.runtime, err = runtime.NewRuntime(suite.state, zaptest.NewLogger(suite.T()))
5353
suite.Require().NoError(err)
5454

55-
suite.Require().NoError(suite.runtime.RegisterController(&netctrl.AddressMergeController{}))
55+
suite.Require().NoError(suite.runtime.RegisterController(netctrl.NewAddressMergeController()))
5656

5757
suite.startRuntime()
5858
}
@@ -152,7 +152,7 @@ func (suite *AddressMergeSuite) TestMerge() {
152152
suite.assertAddresses(
153153
[]string{
154154
"lo/127.0.0.1/8",
155-
"eth0/10.0.0.35/32",
155+
"eth0/10.0.0.1/8",
156156
}, func(*network.AddressSpec, *assert.Assertions) {},
157157
)
158158
suite.Assert().NoError(
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
package network
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"strings"
11+
12+
"github.com/cosi-project/runtime/pkg/controller"
13+
"github.com/cosi-project/runtime/pkg/resource"
14+
"github.com/cosi-project/runtime/pkg/resource/typed"
15+
"github.com/cosi-project/runtime/pkg/safe"
16+
"go.uber.org/zap"
17+
)
18+
19+
type genericMergeFunc[T typed.DeepCopyable[T], E typed.Extension, R typed.Resource[T, E]] func(logger *zap.Logger, in safe.List[*R]) map[resource.ID]*T
20+
21+
// GenericMergeController initializes a generic merge controller for network resources.
22+
func GenericMergeController[T typed.DeepCopyable[T], E typed.Extension](namespaceIn, namespaceOut resource.Namespace, mergeFunc genericMergeFunc[T, E, typed.Resource[T, E]]) controller.Controller {
23+
var zeroE E
24+
25+
controllerName := strings.ReplaceAll(zeroE.ResourceDefinition().Type, "Spec", "MergeController")
26+
27+
return &genericMergeController[T, E]{
28+
controllerName: controllerName,
29+
resourceType: zeroE.ResourceDefinition().Type,
30+
namespaceIn: namespaceIn,
31+
namespaceOut: namespaceOut,
32+
mergeFunc: mergeFunc,
33+
}
34+
}
35+
36+
type genericMergeController[T typed.DeepCopyable[T], E typed.Extension] struct {
37+
controllerName string
38+
resourceType resource.Type
39+
namespaceIn resource.Namespace
40+
namespaceOut resource.Namespace
41+
mergeFunc genericMergeFunc[T, E, typed.Resource[T, E]]
42+
}
43+
44+
func (ctrl *genericMergeController[T, E]) Name() string {
45+
return ctrl.controllerName
46+
}
47+
48+
func (ctrl *genericMergeController[T, E]) Inputs() []controller.Input {
49+
return []controller.Input{
50+
{
51+
Namespace: ctrl.namespaceIn,
52+
Type: ctrl.resourceType,
53+
Kind: controller.InputWeak,
54+
},
55+
{
56+
Namespace: ctrl.namespaceOut,
57+
Type: ctrl.resourceType,
58+
Kind: controller.InputDestroyReady,
59+
},
60+
}
61+
}
62+
63+
func (ctrl *genericMergeController[T, E]) Outputs() []controller.Output {
64+
return []controller.Output{
65+
{
66+
Type: ctrl.resourceType,
67+
Kind: controller.OutputShared,
68+
},
69+
}
70+
}
71+
72+
//nolint:gocyclo
73+
func (ctrl *genericMergeController[T, E]) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
74+
for {
75+
select {
76+
case <-ctx.Done():
77+
return nil
78+
case <-r.EventCh():
79+
}
80+
81+
type R = typed.Resource[T, E]
82+
83+
// list source network configuration resources
84+
in, err := safe.ReaderList[*R](ctx, r, resource.NewMetadata(ctrl.namespaceIn, ctrl.resourceType, "", resource.VersionUndefined))
85+
if err != nil {
86+
return fmt.Errorf("error listing source network resources: %w", err)
87+
}
88+
89+
merged := ctrl.mergeFunc(logger, in)
90+
91+
// cleanup resources, detecting conflicts on the way
92+
out, err := safe.ReaderList[*R](ctx, r, resource.NewMetadata(ctrl.namespaceOut, ctrl.resourceType, "", resource.VersionUndefined))
93+
if err != nil {
94+
return fmt.Errorf("error listing output resources: %w", err)
95+
}
96+
97+
for res := range out.All() {
98+
shouldBeDestroyed := false
99+
if _, ok := merged[res.Metadata().ID()]; !ok {
100+
shouldBeDestroyed = true
101+
}
102+
103+
isTearingDown := res.Metadata().Phase() == resource.PhaseTearingDown
104+
105+
if shouldBeDestroyed || isTearingDown {
106+
var okToDestroy bool
107+
108+
okToDestroy, err = r.Teardown(ctx, res.Metadata())
109+
if err != nil {
110+
return fmt.Errorf("error cleaning up addresses: %w", err)
111+
}
112+
113+
if okToDestroy {
114+
if err = r.Destroy(ctx, res.Metadata()); err != nil {
115+
return fmt.Errorf("error cleaning up addresses: %w", err)
116+
}
117+
} else if !shouldBeDestroyed {
118+
// resource is not ready to be destroyed yet, skip it
119+
delete(merged, res.Metadata().ID())
120+
}
121+
}
122+
}
123+
124+
var zeroT T
125+
126+
for id, spec := range merged {
127+
if err = safe.WriterModify(ctx, r,
128+
typed.NewResource[T, E](resource.NewMetadata(ctrl.namespaceOut, ctrl.resourceType, id, resource.VersionUndefined), zeroT),
129+
func(r *R) error {
130+
*r.TypedSpec() = *spec
131+
132+
return nil
133+
}); err != nil {
134+
return fmt.Errorf("error updating resource: %w", err)
135+
}
136+
137+
logger.Debug("merged spec", zap.String("id", id), zap.Any("spec", spec))
138+
}
139+
140+
r.ResetRestartBackoff()
141+
}
142+
}

0 commit comments

Comments
 (0)