Skip to content

update gateway-services table with endpoints #13217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9c2cd37
update gateway-services table with endpoints
dhiaayachi May 25, 2022
98c4358
fix failing test
dhiaayachi May 25, 2022
1734104
remove unneeded config in test
dhiaayachi May 25, 2022
cdd5543
rename "endpoint" to "destination"
dhiaayachi May 25, 2022
aad5214
more endpoint renaming to destination in tests
dhiaayachi May 25, 2022
a312202
update isDestination based on service-defaults config entry creation
dhiaayachi May 25, 2022
2fd75dc
use a 3 state kind to be able to set the kind to unknown (when neithe…
dhiaayachi May 26, 2022
4e361d7
set unknown state to empty to avoid modifying alot of tests
dhiaayachi May 26, 2022
f6aafd8
fix logic to set the kind correctly on CRUD
dhiaayachi May 26, 2022
db9529f
fix failing tests
dhiaayachi May 26, 2022
3c5b85c
Merge pull request #13226 from hashicorp/egress-gtw/rename-destination
dhiaayachi May 26, 2022
2147d77
Merge branch 'main' into egress-gtw/tgtw-gateway-services
dhiaayachi May 26, 2022
9949f4d
add missing tests and fix service delete
dhiaayachi May 26, 2022
35a80c5
fix failing test
dhiaayachi May 27, 2022
1e6803f
Apply suggestions from code review
dhiaayachi May 27, 2022
711bdc5
fix a bug with kind and add relevant test
dhiaayachi May 27, 2022
fcd4a2e
fix compile error
dhiaayachi May 27, 2022
8a9ff79
fix failing tests
dhiaayachi May 27, 2022
5d66a08
add kind to clone
dhiaayachi May 27, 2022
36d52fa
fix failing tests
dhiaayachi May 30, 2022
9ed9345
fix failing tests in catalog endpoint
dhiaayachi May 30, 2022
042b874
fix service dump test
dhiaayachi May 30, 2022
ac6c9b4
Apply suggestions from code review
dhiaayachi May 31, 2022
2440051
remove duplicate tests
dhiaayachi May 31, 2022
f900906
rename consts and fix kind when no destination is defined in the serv…
dhiaayachi May 31, 2022
f964e09
rename Kind to ServiceKind and change switch to use .(type)
dhiaayachi May 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 108 additions & 11 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,16 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
}

if svc.PeerName == "" {
// Check if this service is covered by a gateway's wildcard specifier
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
// Do not associate non-typical services with gateways or consul services
if svc.Kind == structs.ServiceKindTypical && svc.Service != "consul" {
// Check if this service is covered by a gateway's wildcard specifier, we force the service kind to a gateway-service here as that take precedence
sn := structs.ServiceName{Name: svc.Service, EnterpriseMeta: svc.EnterpriseMeta}
if err = checkGatewayWildcardsAndUpdate(tx, idx, &sn, structs.GatewayservicekindService); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
if err = checkGatewayAndUpdate(tx, idx, &sn, structs.GatewayservicekindService); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
}
}
if err := upsertKindServiceName(tx, idx, svc.Kind, svc.CompoundServiceName()); err != nil {
Expand Down Expand Up @@ -3469,6 +3476,7 @@ func terminatingConfigGatewayServices(
return false, nil, fmt.Errorf("unexpected config entry type: %T", conf)
}

// TODO(egress-tproxy): Check if it's an endpoint by quering the service and the service defaut config entry.
// Check if service list matches the last known list for the config entry, if it does, skip the update
_, c, err := configEntryTxn(tx, nil, conf.GetKind(), conf.GetName(), entMeta)
if err != nil {
Expand All @@ -3483,6 +3491,10 @@ func terminatingConfigGatewayServices(

var gatewayServices structs.GatewayServices
for _, svc := range entry.Services {
kind, err := GatewayServiceKind(tx, svc.Name, &svc.EnterpriseMeta)
if err != nil {
return false, nil, fmt.Errorf("failed to query endpoints: %v", err)
}
mapping := &structs.GatewayService{
Gateway: gateway,
Service: structs.NewServiceName(svc.Name, &svc.EnterpriseMeta),
Expand All @@ -3491,13 +3503,36 @@ func terminatingConfigGatewayServices(
CertFile: svc.CertFile,
CAFile: svc.CAFile,
SNI: svc.SNI,
Kind: kind,
}

gatewayServices = append(gatewayServices, mapping)
}
return false, gatewayServices, nil
}

func GatewayServiceKind(tx ReadTxn, name string, entMeta *acl.EnterpriseMeta) (structs.GatewayServiceKind, error) {
serviceIter, err := tx.Get(tableServices, indexService, Query{
Value: name,
EnterpriseMeta: *entMeta,
})
if err != nil {
return structs.GatewayservicekindService, err
}
for service := serviceIter.Next(); service != nil; service = serviceIter.Next() {
return structs.GatewayservicekindUnknown, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err != nil {
return structs.GatewayservicekindService, err
}
for service := serviceIter.Next(); service != nil; service = serviceIter.Next() {
return structs.GatewayservicekindUnknown, nil
}
if err != nil {
return structs.GatewayservicekindUnknown, err
}
for service := serviceIter.Next(); service != nil; service = serviceIter.Next() {
return structs.GatewayservicekindService, nil
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have failed some tests, I will check if we can cover it.


_, entry, err := configEntryTxn(tx, nil, structs.ServiceDefaults, name, entMeta)
if err != nil {
return structs.GatewayservicekindUnknown, err
}
if entry != nil {
return structs.GatewayservicekindDestination, nil
}
return structs.GatewayservicekindUnknown, nil
}

// updateGatewayNamespace is used to target all services within a namespace
func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewayService, entMeta *acl.EnterpriseMeta) error {
if entMeta == nil {
Expand Down Expand Up @@ -3538,6 +3573,41 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
return err
}
}
endpoints, err := tx.Get(tableConfigEntries, indexID+"_prefix", ConfigEntryKindQuery{Kind: structs.ServiceDefaults, EnterpriseMeta: *entMeta})
if err != nil {
return fmt.Errorf("failed querying endpoints: %s", err)
}
for endpoint := endpoints.Next(); endpoint != nil; endpoint = endpoints.Next() {
e := endpoint.(*structs.ServiceConfigEntry)
if e.Destination == nil {
continue
}

sn := structs.ServiceName{
Name: e.Name,
EnterpriseMeta: e.EnterpriseMeta,
}
existing, err := tx.First(tableGatewayServices, indexID, service.Gateway, sn, service.Port)
if err != nil {
return fmt.Errorf("gateway service lookup failed: %s", err)
}
if existing != nil {
// If there's an existing service associated with this gateway then we skip it.
// This means the service was specified on its own, and the service entry overrides the wildcard entry.
continue
}

mapping := service.Clone()

mapping.Service = structs.NewServiceName(e.Name, &service.Service.EnterpriseMeta)
mapping.Kind = structs.GatewayservicekindDestination
mapping.FromWildcard = true

err = updateGatewayService(tx, idx, mapping)
if err != nil {
return err
}
}

// Also store a mapping for the wildcard so that the TLS creds can be pulled
// for new services registered in its namespace
Expand Down Expand Up @@ -3585,25 +3655,21 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi
// checkWildcardForGatewaysAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the
// gateway-services table.
func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeService) error {
// Do not associate non-typical services with gateways or consul services
if svc.Kind != structs.ServiceKindTypical || svc.Service == "consul" {
return nil
}

func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, kind structs.GatewayServiceKind) error {
sn := structs.ServiceName{Name: structs.WildcardSpecifier, EnterpriseMeta: svc.EnterpriseMeta}
svcGateways, err := tx.Get(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Service, err)
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err)
}
for service := svcGateways.Next(); service != nil; service = svcGateways.Next() {
if wildcardSvc, ok := service.(*structs.GatewayService); ok && wildcardSvc != nil {

// Copy the wildcard mapping and modify it
gatewaySvc := wildcardSvc.Clone()

gatewaySvc.Service = structs.NewServiceName(svc.Service, &svc.EnterpriseMeta)
gatewaySvc.Service = structs.NewServiceName(svc.Name, &svc.EnterpriseMeta)
gatewaySvc.FromWildcard = true
gatewaySvc.Kind = kind

if err = updateGatewayService(tx, idx, gatewaySvc); err != nil {
return fmt.Errorf("Failed to associate service %q with gateway %q", gatewaySvc.Service.String(), gatewaySvc.Gateway.String())
Expand All @@ -3613,6 +3679,31 @@ func checkGatewayWildcardsAndUpdate(tx WriteTxn, idx uint64, svc *structs.NodeSe
return nil
}

// checkGatewayAndUpdate checks whether a service matches a
// wildcard definition in gateway config entries and if so adds it the the
// gateway-services table.
func checkGatewayAndUpdate(tx WriteTxn, idx uint64, svc *structs.ServiceName, kind structs.GatewayServiceKind) error {
sn := structs.ServiceName{Name: svc.Name, EnterpriseMeta: svc.EnterpriseMeta}
svcGateways, err := tx.First(tableGatewayServices, indexService, sn)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.Name, err)
}

if service, ok := svcGateways.(*structs.GatewayService); ok && service != nil {
// Copy the wildcard mapping and modify it
gatewaySvc := service.Clone()

gatewaySvc.Service = structs.NewServiceName(svc.Name, &svc.EnterpriseMeta)
gatewaySvc.Kind = kind

if err = updateGatewayService(tx, idx, gatewaySvc); err != nil {
return fmt.Errorf("Failed to associate service %q with gateway %q", gatewaySvc.Service.String(), gatewaySvc.Gateway.String())
}
}

return nil
}

func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) error {
// Clean up association between service name and gateways if needed
sn := structs.ServiceName{Name: svc.ServiceName, EnterpriseMeta: svc.EnterpriseMeta}
Expand Down Expand Up @@ -3643,6 +3734,12 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
if err := deleteGatewayServiceTopologyMapping(tx, idx, m); err != nil {
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
}
} else {
kind, err := GatewayServiceKind(tx, m.Service.Name, &m.Service.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
}
checkGatewayAndUpdate(tx, idx, &structs.ServiceName{Name: m.Service.Name, EnterpriseMeta: m.Service.EnterpriseMeta}, kind)
}
}
return nil
Expand Down
110 changes: 110 additions & 0 deletions agent/consul/state/catalog_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,18 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
},
WantEvents: []stream.Event{
testServiceHealthEvent(t, "srv1", evNodeUnchanged),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIPs("srv1"),
),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evNodeUnchanged,
evServiceUnchanged,
evServiceTermingGateway("srv1")),
},
})
run(t, eventsTestCase{
Expand Down Expand Up @@ -1505,6 +1517,18 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t, "srv1"),
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("srv1"),
evTerminatingGatewayVirtualIPs("srv1"),
),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evNodeUnchanged,
evServiceUnchanged,
evServiceTermingGateway("srv1")),
},
})
run(t, eventsTestCase{
Expand Down Expand Up @@ -1625,6 +1649,92 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
evTerminatingGatewayVirtualIPs("srv1", "srv2")),
},
})
run(t, eventsTestCase{
Name: "terminating gateway destination service-defaults",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "destination1",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntryDest := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "destination1",
Destination: &structs.DestinationConfig{Port: 9000, Address: "kafka.test.com"},
}
return ensureConfigEntryTxn(tx, tx.Index, configEntryDest)
},
WantEvents: []stream.Event{
testServiceHealthDeregistrationEvent(t,
"tgate1",
evConnectTopic,
evServiceTermingGateway("destination1"),
evTerminatingGatewayVirtualIPs("destination1")),
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evNodeUnchanged,
evServiceUnchanged,
evServiceTermingGateway("destination1"),
evTerminatingGatewayVirtualIPs("destination1"),
),
},
})

run(t, eventsTestCase{
Name: "terminating gateway destination service-defaults wildcard",
Setup: func(s *Store, tx *txn) error {
configEntry := &structs.TerminatingGatewayConfigEntry{
Kind: structs.TerminatingGateway,
Name: "tgate1",
Services: []structs.LinkedService{
{
Name: "*",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
err := ensureConfigEntryTxn(tx, tx.Index, configEntry)
if err != nil {
return err
}
return s.ensureRegistrationTxn(tx, tx.Index, false,
testServiceRegistration(t, "tgate1", regTerminatingGateway), false)
},
Mutate: func(s *Store, tx *txn) error {
configEntryDest := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "destination1",
Destination: &structs.DestinationConfig{Port: 9000, Address: "kafka.test.com"},
}
return ensureConfigEntryTxn(tx, tx.Index, configEntryDest)
},
WantEvents: []stream.Event{
testServiceHealthEvent(t,
"tgate1",
evConnectTopic,
evNodeUnchanged,
evServiceUnchanged,
evServiceTermingGateway("destination1"),
evTerminatingGatewayVirtualIPs("*"),
),
},
})
}

func (tc eventsTestCase) run(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions agent/consul/state/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5170,6 +5170,7 @@ func TestStateStore_GatewayServices_Terminating(t *testing.T) {
CreateIndex: 23,
ModifyIndex: 23,
},
Kind: structs.GatewayservicekindService,
},
}
assert.Equal(t, expect, out)
Expand Down Expand Up @@ -6421,6 +6422,7 @@ func TestStateStore_DumpGatewayServices(t *testing.T) {
CreateIndex: 22,
ModifyIndex: 22,
},
Kind: structs.GatewayservicekindService,
},
}
assert.Equal(t, expect, out)
Expand Down
Loading