diff --git a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go index 764e6e470..d1b0a5703 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go @@ -9,6 +9,27 @@ import ( // +kubebuilder:object:generate:=true +type mapvalue struct { + StringVal string `json:"format,omitempty"` + Secret *plugins.Secret `json:"format,omitempty"` +} + +// Implement the GetStringVal method to satisfy the SecretProvider interface +func (m mapvalue) GetStringVal() string { + return m.StringVal +} + +// mapvalue implicitly implements params.SecretProvider because SecretProvider is an empty interface +var _ params.SecretProvider = mapvalue{} + +func convertMap(input map[string]mapvalue) map[string]params.SecretProvider { + result := make(map[string]params.SecretProvider) + for k, v := range input { + result[k] = v + } + return result +} + // Kafka output plugin allows to ingest your records into an Apache Kafka service.
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/outputs/kafka** type Kafka struct { @@ -35,8 +56,8 @@ type Kafka struct { // then by default the first topic in the Topics list will indicate the topic to be used. TopicKey string `json:"topicKey,omitempty"` // {property} can be any librdkafka properties - Rdkafka map[string]string `json:"rdkafka,omitempty"` //adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured + Rdkafka map[string]mapvalue `json:"rdkafka,omitempty"` DynamicTopic *bool `json:"dynamicTopic,omitempty"` //Fluent Bit queues data into rdkafka library, //if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. @@ -84,8 +105,8 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) { kvs.Insert("queue_full_retries", fmt.Sprint(*k.QueueFullRetries)) } - kvs.InsertStringMap(k.Rdkafka, func(k, v string) (string, string) { - return fmt.Sprintf("rdkafka.%s", k), v + kvs.InsertMapValMap(convertMap(k.Rdkafka), func(k, v string) (string, params.SecretProvider) { + return fmt.Sprintf("rdkafka.%s", k), mapvalue{StringVal: v} }) return kvs, nil diff --git a/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go b/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go index d4ad45b19..d01aa5a11 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go +++ b/apis/fluentbit/v1alpha2/plugins/output/zz_generated.deepcopy.go @@ -506,38 +506,6 @@ func (in *InfluxDB) DeepCopy() *InfluxDB { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Kafka) DeepCopyInto(out *Kafka) { - *out = *in - if in.Rdkafka != nil { - in, out := &in.Rdkafka, &out.Rdkafka - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } - if in.DynamicTopic != nil { - in, out := &in.DynamicTopic, &out.DynamicTopic - *out = new(bool) - **out = **in - } - if in.QueueFullRetries != nil { - in, out := &in.QueueFullRetries, &out.QueueFullRetries - *out = new(int64) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Kafka. -func (in *Kafka) DeepCopy() *Kafka { - if in == nil { - return nil - } - out := new(Kafka) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Kinesis) DeepCopyInto(out *Kinesis) { *out = *in diff --git a/apis/fluentbit/v1alpha2/plugins/params/kvs.go b/apis/fluentbit/v1alpha2/plugins/params/kvs.go index 65c99812a..50af56659 100644 --- a/apis/fluentbit/v1alpha2/plugins/params/kvs.go +++ b/apis/fluentbit/v1alpha2/plugins/params/kvs.go @@ -11,6 +11,12 @@ import ( type kvTransformFunc func(string, string) (string, string) +type SecretProvider interface { + GetStringVal() string +} + +type kvTransformFunc1 func(string, string) (string, SecretProvider) + type KVs struct { keys []string values []string @@ -25,6 +31,36 @@ func NewKVs() *KVs { } } +func (kvs *KVs) InsertMapValMap(m map[string]SecretProvider, f kvTransformFunc1) { + if len(m) > 0 { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + + sort.Strings(keys) + + for _, k := range keys { + v := m[k] + strval := v.GetStringVal() + if f != nil { + transformedKey, transformedVal := f(k, strval) + + if transformedVal != nil { + strval = transformedVal.GetStringVal() + } else { + strval = "" // Default to an empty string if transformation returns nil + } + + k = transformedKey + } + + kvs.Insert(k, strval) + + } + } +} + func (kvs *KVs) Insert(key, value string) { kvs.keys = append(kvs.keys, key) kvs.values = append(kvs.values, value) diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml index 1d3a3cacd..cb67871ec 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_clusteroutputs.yaml @@ -1953,7 +1953,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -1974,7 +1975,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: diff --git a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml index 3a03ec407..0f4e42073 100644 --- a/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator/charts/fluent-bit-crds/crds/fluentbit.fluent.io_outputs.yaml @@ -1953,7 +1953,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -1974,7 +1975,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: diff --git a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml index 1d3a3cacd..cb67871ec 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml @@ -1953,7 +1953,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -1974,7 +1975,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: diff --git a/config/crd/bases/fluentbit.fluent.io_outputs.yaml b/config/crd/bases/fluentbit.fluent.io_outputs.yaml index 3a03ec407..0f4e42073 100644 --- a/config/crd/bases/fluentbit.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_outputs.yaml @@ -1953,7 +1953,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -1974,7 +1975,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: diff --git a/manifests/setup/fluent-operator-crd.yaml b/manifests/setup/fluent-operator-crd.yaml index fc8d3b0be..a5fa9cbd1 100644 --- a/manifests/setup/fluent-operator-crd.yaml +++ b/manifests/setup/fluent-operator-crd.yaml @@ -5944,7 +5944,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -5965,7 +5966,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: @@ -34561,7 +34599,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -34582,7 +34621,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index e3cd82eca..93753b4ed 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -5944,7 +5944,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -5965,7 +5966,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: @@ -34561,7 +34599,8 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specipvafy data format, options available: json, + msgpack.' type: string messageKey: description: Optional key to store the message @@ -34582,7 +34621,44 @@ spec: type: integer rdkafka: additionalProperties: - type: string + properties: + format: + description: Secret defines the key of a value. + properties: + valueFrom: + description: ValueSource defines how to find a value's + key. + properties: + secretKeyRef: + description: Selects a key of a secret in the pod's + namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the Secret or its + key must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + type: object + type: object + type: object description: '{property} can be any librdkafka properties' type: object timestampFormat: