Skip to content

Commit d8a159e

Browse files
FxKuePaul
andauthored
create CDC event stream CRD (zalando#1570)
* provide event stream API * check manifest settings for logical decoding before creating streams * operator updates Postgres config and creates replication user * name FES like the Postgres cluster * add delete case and fix updating streams + update unit test * check if fes CRD exists before syncing * existing slot must use the same plugin * make id and payload columns configurable * sync streams only when they are defined in manifest * introduce applicationId for separate stream CRDs * add FES to RBAC in chart * disable streams in chart * switch to pgoutput plugin and let operator create publications * reflect code review and additional refactoring Co-authored-by: Paŭlo Ebermann <[email protected]>
1 parent 8b404fd commit d8a159e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2330
-31
lines changed

charts/postgres-operator/crds/postgresqls.yaml

+33
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,39 @@ spec:
474474
type: string
475475
gs_wal_path:
476476
type: string
477+
streams:
478+
type: array
479+
nullable: true
480+
items:
481+
type: object
482+
required:
483+
- applicationId
484+
- database
485+
- tables
486+
properties:
487+
applicationId:
488+
type: string
489+
batchSize:
490+
type: integer
491+
database:
492+
type: string
493+
filter:
494+
type: object
495+
additionalProperties:
496+
type: string
497+
tables:
498+
type: object
499+
additionalProperties:
500+
type: object
501+
required:
502+
- eventType
503+
properties:
504+
eventType:
505+
type: string
506+
idColumn:
507+
type: string
508+
payloadColumn:
509+
type: string
477510
teamId:
478511
type: string
479512
tls:

charts/postgres-operator/templates/clusterrole.yaml

+16
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,22 @@ rules:
3434
- get
3535
- list
3636
- watch
37+
# all verbs allowed for event streams
38+
{{- if .Values.enableStreams }}
39+
- apiGroups:
40+
- zalando.org
41+
resources:
42+
- fabriceventstreams
43+
verbs:
44+
- create
45+
- delete
46+
- deletecollection
47+
- get
48+
- list
49+
- patch
50+
- update
51+
- watch
52+
{{- end }}
3753
# to create or get/update CRDs when starting up
3854
- apiGroups:
3955
- apiextensions.k8s.io

charts/postgres-operator/values.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,9 @@ configConnectionPooler:
368368
connection_pooler_default_cpu_limit: "1"
369369
connection_pooler_default_memory_limit: 100Mi
370370

371+
# Zalando's internal CDC stream feature
372+
enableStreams: false
373+
371374
rbac:
372375
# Specifies whether RBAC resources should be created
373376
create: true

docs/reference/cluster_manifest.md

+51
Original file line numberDiff line numberDiff line change
@@ -536,3 +536,54 @@ Those parameters are grouped under the `tls` top-level key.
536536
relative to the "/tls/", which is mount path of the tls secret.
537537
If `caSecretName` is defined, the ca.crt path is relative to "/tlsca/",
538538
otherwise to the same "/tls/".
539+
540+
## Change data capture streams
541+
542+
This sections enables change data capture (CDC) streams via Postgres'
543+
[logical decoding](https://www.postgresql.org/docs/14/logicaldecoding.html)
544+
feature and `pgoutput` plugin. While the Postgres operator takes responsibility
545+
for providing the setup to publish change events, it relies on external tools
546+
to consume them. At Zalando, we are using a workflow based on
547+
[Debezium Connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html)
548+
which can feed streams into Zalando’s distributed event broker [Nakadi](https://nakadi.io/)
549+
among others.
550+
551+
The Postgres Operator creates custom resources for Zalando's internal CDC
552+
operator which will be used to set up the consumer part. Each stream object
553+
can have the following properties:
554+
555+
* **applicationId**
556+
The application name to which the database and CDC belongs to. For each
557+
set of streams with a distinct `applicationId` a separate stream CR as well
558+
as a separate logical replication slot will be created. This means there can
559+
be different streams in the same database and streams with the same
560+
`applicationId` are bundled in one stream CR. The stream CR will be called
561+
like the Postgres cluster plus "-<applicationId>" suffix. Required.
562+
563+
* **database**
564+
Name of the database from where events will be published via Postgres'
565+
logical decoding feature. The operator will take care of updating the
566+
database configuration (setting `wal_level: logical`, creating logical
567+
replication slots, using output plugin `pgoutput` and creating a dedicated
568+
replication user). Required.
569+
570+
* **tables**
571+
Defines a map of table names and their properties (`eventType`, `idColumn`
572+
and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
573+
The application is responsible for putting events into a (JSON/B or VARCHAR)
574+
payload column of the outbox table in the structure of the specified target
575+
event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/14/logical-replication-publication.html)
576+
in Postgres for all tables specified for one `database` and `applicationId`.
577+
The CDC operator will consume from it shortly after transactions are
578+
committed to the outbox table. The `idColumn` will be used in telemetry for
579+
the CDC operator. The names for `idColumn` and `payloadColumn` can be
580+
configured. Defaults are `id` and `payload`. The target `eventType` has to
581+
be defined. Required.
582+
583+
* **filter**
584+
Streamed events can be filtered by a jsonpath expression for each table.
585+
Optional.
586+
587+
* **batchSize**
588+
Defines the size of batches in which events are consumed. Optional.
589+
Defaults to 1.

hack/update-codegen.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ trap "cleanup" EXIT SIGINT
1717

1818
bash "${CODEGEN_PKG}/generate-groups.sh" all \
1919
"${OPERATOR_PACKAGE_ROOT}/pkg/generated" "${OPERATOR_PACKAGE_ROOT}/pkg/apis" \
20-
"acid.zalan.do:v1" \
20+
"acid.zalan.do:v1 zalando.org:v1" \
2121
--go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt
2222

2323
cp -r "${OPERATOR_PACKAGE_ROOT}"/pkg/* "${TARGET_CODE_DIR}"

manifests/complete-postgres-manifest.yaml

+20-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ kind: postgresql
33
metadata:
44
name: acid-test-cluster
55
# labels:
6+
# application: test-app
67
# environment: demo
78
# annotations:
89
# "acid.zalan.do/controller": "second-operator"
@@ -17,7 +18,7 @@ spec:
1718
- superuser
1819
- createdb
1920
foo_user: []
20-
# flyway: []
21+
# flyway: []
2122
# usersWithSecretRotation:
2223
# - foo_user
2324
# usersWithInPlaceSecretRotation:
@@ -203,3 +204,21 @@ spec:
203204
# operator: In
204205
# values:
205206
# - enabled
207+
208+
# Enables change data capture streams for defined database tables
209+
# streams:
210+
# - applicationId: test-app
211+
# database: foo
212+
# tables:
213+
# data.state_pending_outbox:
214+
# eventType: test-app.status-pending
215+
# data.state_approved_outbox:
216+
# eventType: test-app.status-approved
217+
# data.orders_outbox:
218+
# eventType: test-app.order-completed
219+
# idColumn: o_id
220+
# payloadColumn: o_payload
221+
# # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events
222+
# filter:
223+
# data.orders_outbox: "[?(@.source.txId > 500 && @.source.lsn > 123456)]"
224+
# batchSize: 1000

manifests/operator-service-account-rbac.yaml

+14
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ rules:
3535
- get
3636
- list
3737
- watch
38+
# all verbs allowed for event streams (Zalando-internal feature)
39+
# - apiGroups:
40+
# - zalando.org
41+
# resources:
42+
# - fabriceventstreams
43+
# verbs:
44+
# - create
45+
# - delete
46+
# - deletecollection
47+
# - get
48+
# - list
49+
# - patch
50+
# - update
51+
# - watch
3852
# to create or get/update CRDs when starting up
3953
- apiGroups:
4054
- apiextensions.k8s.io

manifests/postgresql.crd.yaml

+33
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,39 @@ spec:
472472
type: string
473473
gs_wal_path:
474474
type: string
475+
streams:
476+
type: array
477+
nullable: true
478+
items:
479+
type: object
480+
required:
481+
- applicationId
482+
- database
483+
- tables
484+
properties:
485+
applicationId:
486+
type: string
487+
batchSize:
488+
type: integer
489+
database:
490+
type: string
491+
filter:
492+
type: object
493+
additionalProperties:
494+
type: string
495+
tables:
496+
type: object
497+
additionalProperties:
498+
type: object
499+
required:
500+
- eventType
501+
properties:
502+
eventType:
503+
type: string
504+
idColumn:
505+
type: string
506+
payloadColumn:
507+
type: string
475508
teamId:
476509
type: string
477510
tls:

pkg/apis/acid.zalan.do/v1/crds.go

+48
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,54 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
716716
},
717717
},
718718
},
719+
"streams": {
720+
Type: "array",
721+
Items: &apiextv1.JSONSchemaPropsOrArray{
722+
Schema: &apiextv1.JSONSchemaProps{
723+
Type: "object",
724+
Required: []string{"applicationId", "database", "tables"},
725+
Properties: map[string]apiextv1.JSONSchemaProps{
726+
"applicationId": {
727+
Type: "string",
728+
},
729+
"batchSize": {
730+
Type: "integer",
731+
},
732+
"database": {
733+
Type: "string",
734+
},
735+
"filter": {
736+
Type: "object",
737+
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
738+
Schema: &apiextv1.JSONSchemaProps{
739+
Type: "string",
740+
},
741+
},
742+
},
743+
"tables": {
744+
Type: "object",
745+
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
746+
Schema: &apiextv1.JSONSchemaProps{
747+
Type: "object",
748+
Required: []string{"eventType"},
749+
Properties: map[string]apiextv1.JSONSchemaProps{
750+
"eventType": {
751+
Type: "string",
752+
},
753+
"idColumn": {
754+
Type: "string",
755+
},
756+
"payloadColumn": {
757+
Type: "string",
758+
},
759+
},
760+
},
761+
},
762+
},
763+
},
764+
},
765+
},
766+
},
719767
"teamId": {
720768
Type: "string",
721769
},

pkg/apis/acid.zalan.do/v1/postgresql_type.go

+15
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type PostgresSpec struct {
7777
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
7878
TLS *TLSDescription `json:"tls,omitempty"`
7979
AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"`
80+
Streams []Stream `json:"streams,omitempty"`
8081

8182
// deprecated json tags
8283
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
@@ -231,3 +232,17 @@ type ConnectionPooler struct {
231232

232233
Resources `json:"resources,omitempty"`
233234
}
235+
236+
type Stream struct {
237+
ApplicationId string `json:"applicationId"`
238+
Database string `json:"database"`
239+
Tables map[string]StreamTable `json:"tables"`
240+
Filter map[string]string `json:"filter,omitempty"`
241+
BatchSize uint32 `json:"batchSize,omitempty"`
242+
}
243+
244+
type StreamTable struct {
245+
EventType string `json:"eventType"`
246+
IdColumn string `json:"idColumn,omitempty" defaults:"id"`
247+
PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"`
248+
}

pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go

+53
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/zalando.org/register.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package zalando
2+
3+
const (
4+
// GroupName is the group name for the operator CRDs
5+
GroupName = "zalando.org"
6+
)

0 commit comments

Comments
 (0)