diff --git a/CHANGELOG.md b/CHANGELOG.md index 54d84fa3..0922b8bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,16 @@ v1.5.0 is a feature release. It is supported for all usage. +### Enhancements + +1. Adds support for `highWatermark`, `offsetLag()`, and `offsetLagLow()` in `eachBatch` callback (#317). + + ## Fixes 1. Fix issue of delay of up to 5s in receiving messages after pause and resume, or seek (#285, #363). + # confluent-kafka-javascript v1.4.1 v1.4.1 is a maintenance release. It is supported for all usage. diff --git a/MIGRATION.md b/MIGRATION.md index cf179d9a..27ae6438 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -308,7 +308,7 @@ producerRun().then(consumerRun).catch(console.error); to change. The property `eachBatchAutoResolve` is supported. Within the `eachBatch` callback, use of `uncommittedOffsets` is unsupported, - and within the returned batch, `offsetLag` and `offsetLagLow` are unsupported. + and within the returned batch, `offsetLag` and `offsetLagLow` are supported. * `commitOffsets`: - Does not yet support sending metadata for topic partitions being committed. - If called with no arguments, it commits all offsets passed to the user (or the stored offsets, if manually handling offset storage using `consumer.storeOffsets`). diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index ee649b5e..ada184f8 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -880,6 +880,25 @@ class Consumer { #createBatchPayload(messages) { const topic = messages[0].topic; const partition = messages[0].partition; + let watermarkOffsets = {}; + let highWatermark = '-1001'; + let offsetLag_ = -1; + let offsetLagLow_ = -1; + + try { + watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition); + } catch (e) { + /* Only warn. The batch as a whole remains valid but for the fact that the highwatermark won't be there. */ + this.#logger.warn(`Could not get watermark offsets for batch: ${e}`, this.#createConsumerBindingMessageMetadata()); + } + + if (Number.isInteger(watermarkOffsets.highOffset)) { + highWatermark = watermarkOffsets.highOffset.toString(); + /* While calculating lag, we subtract 1 from the high offset + * for compatibility reasons with KafkaJS's API */ + offsetLag_ = (watermarkOffsets.highOffset - 1) - messages[messages.length - 1].offset; + offsetLagLow_ = (watermarkOffsets.highOffset - 1) - messages[0].offset; + } const messagesConverted = []; for (let i = 0; i < messages.length; i++) { @@ -909,13 +928,13 @@ class Consumer { const batch = { topic, partition, - highWatermark: '-1001', /* We don't fetch it yet. We can call committed() to fetch it but that might incur network calls. */ + highWatermark, messages: messagesConverted, isEmpty: () => false, firstOffset: () => (messagesConverted[0].offset).toString(), lastOffset: () => (messagesConverted[messagesConverted.length - 1].offset).toString(), - offsetLag: () => notImplemented(), - offsetLagLow: () => notImplemented(), + offsetLag: () => offsetLag_.toString(), + offsetLagLow: () => offsetLagLow_.toString(), }; const returnPayload = { diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index c34a04db..ca9204c6 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -845,4 +845,66 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await waitFor(() => (messagesConsumed === messages.length || batchesCountExceeds1), () => { }, 100); expect(batchesCountExceeds1).toBe(false); }); + + it('shows the correct high watermark and lag for partition', async () => { + await producer.connect(); + + let messages0 = Array(10).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + let partition0ProducedMessages = messages0.length; + + const messages1 = Array(5).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 1 }; + }); + + const messages2 = Array(2).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 2 }; + }); + + for (const messages of [messages0, messages1, messages2]) { + await producer.send({ + topic: topicName, + messages: messages, + }); + } + + await consumer.connect(); + await consumer.subscribe({ topic: topicName }); + + let messagesConsumed = 0; + consumer.run({ + partitionsConsumedConcurrently, + eachBatch: async ({ batch }) => { + if (batch.partition === 0) { + expect(batch.highWatermark).toEqual(String(partition0ProducedMessages)); + } else if (batch.partition === 1) { + expect(batch.highWatermark).toEqual(String(messages1.length)); + } else if (batch.partition === 2) { + expect(batch.highWatermark).toEqual(String(messages2.length)); + } + expect(batch.offsetLag()).toEqual(String(+batch.highWatermark - 1 - +batch.lastOffset())); + expect(batch.offsetLagLow()).toEqual(String(+batch.highWatermark - 1 - +batch.firstOffset())); + messagesConsumed += batch.messages.length; + } + }); + await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100); + + /* Add some more messages to partition 0 to make sure high watermark is updated. */ + messages0 = Array(15).fill().map(() => { + const value = secureRandom(); + return { value: `value-${value}`, partition: 0 }; + }); + partition0ProducedMessages += messages0.length; + await producer.send({ + topic: topicName, + messages: messages0, + }); + + await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100); + }); + }); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 56832092..6045a77e 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -280,6 +280,8 @@ export type Batch = { isEmpty(): boolean firstOffset(): string | null lastOffset(): string + offsetLag(): string + offsetLagLow(): string } export type KafkaMessage = MessageSetEntry | RecordBatchEntry