Skip to content

Commit 206347c

Browse files
authored
[KIP-848]: Add testing changes and describe consumer group changes for KIP-848 (#329)
* Run perf tests for consumer protocol * changed docker location for Promisified tests * Run promisified tests with the new protocol * Added kafka_jaas.conf * Remove forbidden properties from KAFAKJS configs too * Skip for partitionAssigners as well * Increase test timeout * increase sleep timeout and expect any strings in protocol in describe groups * increase test timeout * set session timeout in server properties * set session timeout 10 secs * Skip oauthbreaker tests * Skip all oauthbreaker_cb tests * docker logging * increase timeout to 60 * pin versions * Remove logging and decrease sleep time * Change coverage file name * [KIP-848]: Structure change for DescribeConsumerGroups (#327) * Added new fields in DescribeConsumerGroup * error debug * test changes * test enabled * requested changes * Test fix * Test fix * Changelog and test fix * Changelog change
1 parent a00bd34 commit 206347c

19 files changed

+228
-35
lines changed

.semaphore/semaphore.yml

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,21 @@ blocks:
102102
commands:
103103
- make test
104104
- artifact push workflow coverage/mocha/coverage-final.json --destination "mocha-coverage.json"
105-
- name: "Promisified Tests"
105+
- name: "Promisified Tests (Classic Protocol)"
106106
commands:
107107
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
108-
- docker compose up -d && sleep 30
108+
- docker compose -f test/docker/docker-compose.yml up -d && sleep 30
109109
- export NODE_OPTIONS='--max-old-space-size=1536'
110110
- npx jest --no-colors --ci test/promisified/
111-
- artifact push workflow coverage/jest/coverage-final.json --destination "jest-coverage.json"
111+
- artifact push workflow coverage/jest/coverage-final.json --destination "jest-classic-coverage.json"
112+
- name: "Promisified Tests (Consumer Protocol)"
113+
commands:
114+
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
115+
- docker compose -f test/docker/docker-compose-kraft.yml up -d && sleep 30
116+
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
117+
- export NODE_OPTIONS='--max-old-space-size=1536'
118+
- npx jest --no-colors --ci test/promisified/
119+
- artifact push workflow coverage/jest/coverage-final.json --destination "jest-consumer-coverage.json"
112120
- name: "Lint"
113121
commands:
114122
- make lint
@@ -163,10 +171,10 @@ blocks:
163171
- export BUILD_LIBRDKAFKA=0
164172
- npm run install-from-source
165173
jobs:
166-
- name: "Performance Test"
174+
- name: "Performance Test (Classic Protocol)"
167175
commands:
168176
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
169-
- docker compose up -d && sleep 30
177+
- docker compose -f test/docker/docker-compose.yml up -d && sleep 30
170178
- export NODE_OPTIONS='--max-old-space-size=1536'
171179
- cd examples/performance
172180
- npm install
@@ -479,7 +487,8 @@ after_pipeline:
479487
- checkout
480488
- sem-version java 11
481489
- artifact pull workflow mocha-coverage.json
482-
- artifact pull workflow jest-coverage.json
490+
- artifact pull workflow jest-classic-coverage.json
491+
- artifact pull workflow jest-consumer-coverage.json
483492
- artifact pull workflow jest-sr-coverage.json
484493
- npx --yes istanbul-merge --out merged-output/merged-coverage.json *-coverage.json
485494
- npx nyc report -t merged-output --report-dir coverage --reporter=text --reporter=lcov

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# confluent-kafka-javascript v1.4.0
2+
3+
v1.4.0 is a feature release. It is supported for all usage.
4+
5+
## Enhancements
6+
7+
1. References librdkafka v2.11.0. Refer to the [librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for more information.
8+
2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined.
9+
110
# confluent-kafka-javascript v1.3.2
211

312
v1.3.2 is a maintenance release. It is supported for all usage.

examples/kafkajs/admin/describe-groups.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
2-
const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
33
const { parseArgs } = require('node:util');
44

55
function printNode(node, prefix = '') {
@@ -72,6 +72,7 @@ async function adminStart() {
7272
console.log(`\tProtocol type: ${group.protocolType}`);
7373
console.log(`\tPartition assignor: ${group.partitionAssignor}`);
7474
console.log(`\tState: ${group.state}`);
75+
console.log(`\tType: ${group.type}`);
7576
console.log(`\tCoordinator: ${group.coordinator ? group.coordinator.id : group.coordinator}`);
7677
printNode(group.coordinator, '\t');
7778
console.log(`\tAuthorized operations: ${group.authorizedOperations}`);

lib/admin.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ const ConsumerGroupStates = {
2929
EMPTY: 5,
3030
};
3131

32+
const ConsumerGroupTypes = {
33+
UNKNOWN: 0,
34+
CONSUMER: 1,
35+
CLASSIC: 2,
36+
};
37+
3238
/**
3339
* A list of ACL operation types.
3440
* @enum {number}
@@ -95,6 +101,7 @@ module.exports = {
95101
create: createAdminClient,
96102
createFrom: createAdminClientFrom,
97103
ConsumerGroupStates: Object.freeze(ConsumerGroupStates),
104+
ConsumerGroupTypes: Object.freeze(ConsumerGroupTypes),
98105
AclOperationTypes: Object.freeze(AclOperationTypes),
99106
IsolationLevel: Object.freeze(IsolationLevel),
100107
OffsetSpec,

lib/kafkajs/_admin.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,14 @@ module.exports = {
10091009
* @see RdKafka.ConsumerGroupStates
10101010
*/
10111011
ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates,
1012+
/**
1013+
* A list of consumer group types.
1014+
* @enum {number}
1015+
* @readonly
1016+
* @memberof KafkaJS
1017+
* @see RdKafka.ConsumerGroupTypes
1018+
*/
1019+
ConsumerGroupTypes: RdKafka.AdminClient.ConsumerGroupTypes,
10121020
/**
10131021
* A list of ACL operation types.
10141022
* @enum {number}

lib/kafkajs/_consumer.js

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ class Consumer {
474474
}
475475
}
476476

477-
#kafkaJSToConsumerConfig(kjsConfig) {
477+
#kafkaJSToConsumerConfig(kjsConfig, isClassicProtocol = true) {
478478
if (!kjsConfig || Object.keys(kjsConfig).length === 0) {
479479
return {};
480480
}
@@ -498,37 +498,53 @@ class Consumer {
498498
}
499499

500500
if (Object.hasOwn(kjsConfig, 'partitionAssignors')) {
501+
if (!isClassicProtocol) {
502+
throw new error.KafkaJSError(
503+
"partitionAssignors is not supported when group.protocol is not 'classic'.",
504+
{ code: error.ErrorCodes.ERR__INVALID_ARG }
505+
);
506+
}
501507
if (!Array.isArray(kjsConfig.partitionAssignors)) {
502508
throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG });
503509
}
504-
505510
kjsConfig.partitionAssignors.forEach(assignor => {
506511
if (typeof assignor !== 'string')
507512
throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG });
508513
});
509-
510514
rdKafkaConfig['partition.assignment.strategy'] = kjsConfig.partitionAssignors.join(',');
511-
} else {
515+
} else if (isClassicProtocol) {
512516
rdKafkaConfig['partition.assignment.strategy'] = PartitionAssigners.roundRobin;
513517
}
514518

515519
if (Object.hasOwn(kjsConfig, 'sessionTimeout')) {
520+
if (!isClassicProtocol) {
521+
throw new error.KafkaJSError(
522+
"sessionTimeout is not supported when group.protocol is not 'classic'.",
523+
{ code: error.ErrorCodes.ERR__INVALID_ARG }
524+
);
525+
}
516526
rdKafkaConfig['session.timeout.ms'] = kjsConfig.sessionTimeout;
517-
} else {
527+
} else if (isClassicProtocol) {
518528
rdKafkaConfig['session.timeout.ms'] = 30000;
519529
}
520530

531+
if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) {
532+
if (!isClassicProtocol) {
533+
throw new error.KafkaJSError(
534+
"heartbeatInterval is not supported when group.protocol is not 'classic'.",
535+
{ code: error.ErrorCodes.ERR__INVALID_ARG }
536+
);
537+
}
538+
rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval;
539+
}
540+
521541
if (Object.hasOwn(kjsConfig, 'rebalanceTimeout')) {
522542
/* In librdkafka, we use the max poll interval as the rebalance timeout as well. */
523543
rdKafkaConfig['max.poll.interval.ms'] = +kjsConfig.rebalanceTimeout;
524544
} else if (!rdKafkaConfig['max.poll.interval.ms']) {
525545
rdKafkaConfig['max.poll.interval.ms'] = 300000; /* librdkafka default */
526546
}
527547

528-
if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) {
529-
rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval;
530-
}
531-
532548
if (Object.hasOwn(kjsConfig, 'metadataMaxAge')) {
533549
rdKafkaConfig['topic.metadata.refresh.interval.ms'] = kjsConfig.metadataMaxAge;
534550
}
@@ -605,8 +621,11 @@ class Consumer {
605621
}
606622

607623
#finalizedConfig() {
624+
const protocol = this.#userConfig['group.protocol'];
625+
const isClassicProtocol = protocol === undefined ||
626+
(typeof protocol === 'string' && protocol.toLowerCase() === 'classic');
608627
/* Creates an rdkafka config based off the kafkaJS block. Switches to compatibility mode if the block exists. */
609-
let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS);
628+
let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS, isClassicProtocol);
610629

611630
/* There can be multiple different and conflicting config directives for setting the log level:
612631
* 1. If there's a kafkaJS block:

lib/kafkajs/_kafka.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const { Producer, CompressionTypes } = require('./_producer');
22
const { Consumer, PartitionAssigners } = require('./_consumer');
3-
const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
3+
const { Admin, ConsumerGroupStates, ConsumerGroupTypes, AclOperationTypes, IsolationLevel } = require('./_admin');
44
const error = require('./_error');
55
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');
66

@@ -119,6 +119,7 @@ module.exports = {
119119
PartitionAssigners,
120120
PartitionAssignors: PartitionAssigners,
121121
CompressionTypes,
122+
ConsumerGroupTypes,
122123
ConsumerGroupStates,
123124
AclOperationTypes,
124125
IsolationLevel};

lib/rdkafka.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,6 @@ module.exports = {
3939
IsolationLevel: Admin.IsolationLevel,
4040
OffsetSpec: Admin.OffsetSpec,
4141
ConsumerGroupStates: Admin.ConsumerGroupStates,
42+
ConsumerGroupTypes: Admin.ConsumerGroupTypes,
4243
AclOperationTypes: Admin.AclOperationTypes,
4344
};

src/common.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -980,6 +980,9 @@ v8::Local<v8::Object> FromMemberDescription(
980980
assignment: {
981981
topicPartitions: TopicPartition[]
982982
},
983+
targetAssignment?: {
984+
topicPartitions: TopicPartition[]
985+
}
983986
}
984987
*/
985988
v8::Local<v8::Object> returnObject = Nan::New<v8::Object>();
@@ -1028,6 +1031,23 @@ v8::Local<v8::Object> FromMemberDescription(
10281031
Nan::Set(returnObject, Nan::New("assignment").ToLocalChecked(),
10291032
assignmentObject);
10301033

1034+
// targetAssignment
1035+
const rd_kafka_MemberAssignment_t* target_assignment =
1036+
rd_kafka_MemberDescription_target_assignment(member);
1037+
if (target_assignment) {
1038+
const rd_kafka_topic_partition_list_t* target_partitions =
1039+
rd_kafka_MemberAssignment_partitions(target_assignment);
1040+
v8::Local<v8::Array> targetTopicPartitions =
1041+
Conversion::TopicPartition::ToTopicPartitionV8Array(
1042+
target_partitions, false);
1043+
v8::Local<v8::Object> targetAssignmentObject = Nan::New<v8::Object>();
1044+
Nan::Set(targetAssignmentObject,
1045+
Nan::New("topicPartitions").ToLocalChecked(),
1046+
targetTopicPartitions);
1047+
Nan::Set(returnObject, Nan::New("targetAssignment").ToLocalChecked(),
1048+
targetAssignmentObject);
1049+
}
1050+
10311051
return returnObject;
10321052
}
10331053

@@ -1105,6 +1125,10 @@ v8::Local<v8::Object> FromConsumerGroupDescription(
11051125
Nan::Set(returnObject, Nan::New("state").ToLocalChecked(),
11061126
Nan::New<v8::Number>(rd_kafka_ConsumerGroupDescription_state(desc)));
11071127

1128+
// type
1129+
Nan::Set(returnObject, Nan::New("type").ToLocalChecked(),
1130+
Nan::New<v8::Number>(rd_kafka_ConsumerGroupDescription_type(desc)));
1131+
11081132
// coordinator
11091133
const rd_kafka_Node_t* coordinator =
11101134
rd_kafka_ConsumerGroupDescription_coordinator(desc);

test/docker/docker-compose-kraft.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
services:
2+
kafka:
3+
image: apache/kafka:4.0.0
4+
restart: unless-stopped
5+
container_name: kafka
6+
ports:
7+
- 9092:29092
8+
- 9093:29093
9+
volumes:
10+
- ./kafka_jaas.conf:/etc/kafka/kafka_jaas.conf
11+
- ./kraft/server.properties:/mnt/shared/config/server.properties
12+
environment:
13+
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf"
14+

0 commit comments

Comments
 (0)