diff --git a/README.md b/README.md index a82e3c54..550b28a1 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,7 @@ The Kafka Proxy is based on idea of [Cloud SQL Proxy](https://github.com/GoogleC It allows a service to connect to Kafka brokers without having to deal with SASL/PLAIN authentication and SSL certificates. It works by opening tcp sockets on the local machine and proxying connections to the associated Kafka brokers -when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata) -and [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator) +when the sockets are used. The host and port in [Metadata](http://kafka.apache.org/protocol.html#The_Messages_Metadata), [FindCoordinator](http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator) & [DescribeCluster](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeClusterResponse.json) responses received from the brokers are replaced by local counterparts. For discovered brokers (not configured as the boostrap servers), local listeners are started on random ports. The dynamic local listeners feature can be disabled and an additional list of external server mappings can be provided. diff --git a/proxy/protocol/real_decoder.go b/proxy/protocol/real_decoder.go index d87b8dd4..8e7479e3 100644 --- a/proxy/protocol/real_decoder.go +++ b/proxy/protocol/real_decoder.go @@ -2,8 +2,9 @@ package protocol import ( "encoding/binary" - "github.com/google/uuid" "math" + + "github.com/google/uuid" ) var errInvalidArrayLength = PacketDecodingError{"invalid array length"} diff --git a/proxy/protocol/responses.go b/proxy/protocol/responses.go index 774ff90d..34929b74 100644 --- a/proxy/protocol/responses.go +++ b/proxy/protocol/responses.go @@ -10,8 +10,10 @@ import ( const ( apiKeyMetadata = 3 apiKeyFindCoordinator = 10 + apiKeyDescribeCluster = 60 brokersKeyName = "brokers" + brokerKeyName = "broker_id" hostKeyName = "host" portKeyName = "port" nodeKeyName = "node_id" @@ -23,6 +25,7 @@ const ( var ( metadataResponseSchemaVersions = createMetadataResponseSchemaVersions() findCoordinatorResponseSchemaVersions = createFindCoordinatorResponseSchemaVersions() + describeClusterResponseSchemaVersions = createDescribeClusterResponseSchemaVersions() ) func createMetadataResponseSchemaVersions() []Schema { @@ -325,6 +328,58 @@ func createFindCoordinatorResponseSchemaVersions() []Schema { return []Schema{findCoordinatorResponseV0, findCoordinatorResponseV1, findCoordinatorResponseV2, findCoordinatorResponseV3, findCoordinatorResponseV4, findCoordinatorResponseV5, findCoordinatorResponseV6} } +func createDescribeClusterResponseSchemaVersions() []Schema { + describeClusterBrokerV0 := NewSchema("describe_cluster_broker_v0", + &Mfield{Name: brokerKeyName, Ty: TypeInt32}, + &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, + &Mfield{Name: portKeyName, Ty: TypeInt32}, + &Mfield{Name: "rack", Ty: TypeCompactNullableStr}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + describeClusterBrokerV2 := NewSchema("describe_cluster_broker_v2", + &Mfield{Name: brokerKeyName, Ty: TypeInt32}, + &Mfield{Name: hostKeyName, Ty: TypeCompactStr}, + &Mfield{Name: portKeyName, Ty: TypeInt32}, + &Mfield{Name: "rack", Ty: TypeCompactNullableStr}, + &Mfield{Name: "is_fenced", Ty: TypeBool}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + + describeClusterV0 := NewSchema("describe_cluster_response_v0", + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Mfield{Name: "error_message", Ty: TypeCompactNullableStr}, + &Mfield{Name: "cluster_id", Ty: TypeCompactStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + describeClusterV1 := NewSchema("describe_cluster_response_v1", + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Mfield{Name: "error_message", Ty: TypeCompactNullableStr}, + &Mfield{Name: "endpoint_type", Ty: TypeInt8}, + &Mfield{Name: "cluster_id", Ty: TypeCompactStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV0}, + &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + describeClusterV2 := NewSchema("describe_cluster_response_v2", + &Mfield{Name: "throttle_time_ms", Ty: TypeInt32}, + &Mfield{Name: "error_code", Ty: TypeInt16}, + &Mfield{Name: "error_message", Ty: TypeCompactNullableStr}, + &Mfield{Name: "endpoint_type", Ty: TypeInt8}, + &Mfield{Name: "cluster_id", Ty: TypeCompactStr}, + &Mfield{Name: "controller_id", Ty: TypeInt32}, + &CompactArray{Name: brokersKeyName, Ty: describeClusterBrokerV2}, + &Mfield{Name: "cluster_authorized_operations", Ty: TypeInt32}, + &SchemaTaggedFields{Name: "response_tagged_fields"}, + ) + return []Schema{describeClusterV0, describeClusterV1, describeClusterV2} +} + func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { if decodedStruct == nil { return errors.New("decoded struct must not be nil") @@ -441,6 +496,56 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e return nil } +func modifyDescribeClusterResponse(decodedStruct *Struct, fn config.NetAddressMappingFunc) error { + if decodedStruct == nil { + return errors.New("decoded struct must not be nil") + } + if fn == nil { + return errors.New("net address mapper must not be nil") + } + brokersArray, ok := decodedStruct.Get(brokersKeyName).([]interface{}) + if !ok { + return errors.New("brokers list not found") + } + for _, brokerElement := range brokersArray { + broker := brokerElement.(*Struct) + host, ok := broker.Get(hostKeyName).(string) + if !ok { + return errors.New("broker.host not found") + } + port, ok := broker.Get(portKeyName).(int32) + if !ok { + return errors.New("broker.port not found") + } + brokerId, ok := broker.Get(brokerKeyName).(int32) + if !ok { + return errors.New("broker.broker_id not found") + } + + if host == "" && port <= 0 { + continue + } + + newHost, newPort, err := fn(host, port, brokerId) + if err != nil { + return err + } + if host != newHost { + err := broker.Replace(hostKeyName, newHost) + if err != nil { + return err + } + } + if port != newPort { + err = broker.Replace(portKeyName, newPort) + if err != nil { + return err + } + } + } + return nil +} + type ResponseModifier interface { Apply(resp []byte) ([]byte, error) } @@ -471,6 +576,8 @@ func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc conf return newResponseModifier(apiKey, apiVersion, addressMappingFunc, metadataResponseSchemaVersions, modifyMetadataResponse) case apiKeyFindCoordinator: return newResponseModifier(apiKey, apiVersion, addressMappingFunc, findCoordinatorResponseSchemaVersions, modifyFindCoordinatorResponse) + case apiKeyDescribeCluster: + return newResponseModifier(apiKey, apiVersion, addressMappingFunc, describeClusterResponseSchemaVersions, modifyDescribeClusterResponse) default: return nil, nil } diff --git a/proxy/protocol/responses_test.go b/proxy/protocol/responses_test.go index 48e16474..146c6438 100644 --- a/proxy/protocol/responses_test.go +++ b/proxy/protocol/responses_test.go @@ -9,9 +9,10 @@ import ( "github.com/google/uuid" - "github.com/grepplabs/kafka-proxy/config" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + + "github.com/grepplabs/kafka-proxy/config" ) var ( @@ -2600,6 +2601,206 @@ func testMetadataResponse(t *testing.T, apiVersion int16, payload string, expect a.Equal(expectedModified, dc.AttrValues()) } +func TestDescribeClusterResponseV0(t *testing.T) { + payload := "000000000000000b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a400000000000000" + + expectedInput := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string localhost", + "port int32 19092", + "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string localhost", + "port int32 29092", + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + expectedModified := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string myhost1", // replaced + "port int32 34001", // replaced + "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string myhost2", // replaced + "port int32 34002", // replaced + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + + testDescribeClusterResponse(t, 0, payload, expectedInput, expectedModified) +} + +func TestDescribeClusterResponseV1(t *testing.T) { + payload := "00000000000000010b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a940000000000020a6c6f63616c686f7374000071a400000000000000" + + expectedInput := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string localhost", + "port int32 19092", + "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string localhost", + "port int32 29092", + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + expectedModified := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string myhost1", // replaced + "port int32 34001", // replaced + "rack *string ", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string myhost2", // replaced + "port int32 34002", // replaced + "rack *string ", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + + testDescribeClusterResponse(t, 1, payload, expectedInput, expectedModified) +} + +func TestDescribeClusterResponseV2(t *testing.T) { + payload := "00000000000000010b6d795f636c75737465720000000003000000010a6c6f63616c686f737400004a94000000000000020a6c6f63616c686f7374000071a40000000000000000" + + expectedInput := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string localhost", + "port int32 19092", + "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string localhost", + "port int32 29092", + "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + expectedModified := []string{ + "throttle_time_ms int32 0", + "error_code int16 0", + "error_message *string ", + "endpoint_type int8 1", + "cluster_id string my_cluster", + "controller_id int32 0", + "[brokers]", + "brokers struct", + "broker_id int32 1", + "host string myhost1", // replaced + "port int32 34001", // replaced + "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "brokers struct", + "broker_id int32 2", + "host string myhost2", // replaced + "port int32 34002", // replaced + "rack *string ", + "is_fenced bool false", + "[response_tagged_fields]", + "cluster_authorized_operations int32 0", + "[response_tagged_fields]", + } + + testDescribeClusterResponse(t, 2, payload, expectedInput, expectedModified) +} + +func testDescribeClusterResponse(t *testing.T, apiVersion int16, payload string, expectedInput, expectedModified []string) { + bytes, err := hex.DecodeString(payload) + if err != nil { + t.Fatal(err) + } + a := assert.New(t) + + schema := describeClusterResponseSchemaVersions[apiVersion] + + s, err := DecodeSchema(bytes, schema) + a.Nil(err) + + dc := newDecodeCheck() + err = dc.Traverse(s) + if err != nil { + t.Fatal(err) + } + a.Equal(expectedInput, dc.AttrValues()) + resp, err := EncodeSchema(s, schema) + a.Nil(err) + a.Equal(bytes, resp) + + modifier, err := GetResponseModifier(apiKeyDescribeCluster, apiVersion, testResponseModifier2) + if err != nil { + t.Fatal(err) + } + a.Nil(err) + resp, err = modifier.Apply(resp) + a.Nil(err) + s, err = DecodeSchema(resp, schema) + a.Nil(err) + dc = newDecodeCheck() + err = dc.Traverse(s) + if err != nil { + t.Fatal(err) + } + a.Equal(expectedModified, dc.AttrValues()) +} + func TestFindCoordinatorResponseV0(t *testing.T) { /* FindCoordinator Response (Version: 0) => error_code coordinator @@ -3162,6 +3363,8 @@ func (t *decodeCheck) value(s *Struct, arg interface{}, sindex int) error { switch v := arg.(type) { case bool: t.append(name, "bool", v) + case int8: + t.append(name, "int8", v) case int16: t.append(name, "int16", v) case int32: diff --git a/proxy/protocol/schema.go b/proxy/protocol/schema.go index 6810916c..8b9e8c5d 100644 --- a/proxy/protocol/schema.go +++ b/proxy/protocol/schema.go @@ -3,14 +3,16 @@ package protocol import ( "bytes" "fmt" - "github.com/google/uuid" "reflect" + "github.com/google/uuid" + "github.com/pkg/errors" ) var ( TypeBool = &Bool{} + TypeInt8 = &Int8{} TypeInt16 = &Int16{} TypeInt32 = &Int32{} TypeStr = &Str{} @@ -103,6 +105,35 @@ func (f *Bool) GetName() string { return "bool" } +// Field int8 + +type Int8 struct{} + +func (f *Int8) decode(pd packetDecoder) (interface{}, error) { + return pd.getInt8() +} + +func (f *Int8) encode(pe packetEncoder, value interface{}) error { + in, ok := value.(int8) + if !ok { + return SchemaEncodingError{fmt.Sprintf("value %T not a int8", value)} + } + pe.putInt8(in) + return nil +} + +func (f *Int8) GetFields() []boundField { + return nil +} + +func (f *Int8) GetFieldsByName() map[string]*boundField { + return nil +} + +func (f *Int8) GetName() string { + return "int8" +} + // Field int16 type Int16 struct{}