From 90834e8c94aadbe432cea22f0992d23c2f1bc79a Mon Sep 17 00:00:00 2001 From: GurtejSohi Date: Mon, 29 Dec 2025 10:20:41 +0530 Subject: [PATCH 1/5] feat: add support for callback in send method --- .../core/eventstore/EventProducer.java | 19 +++++++ .../core/eventstore/SendCallback.java | 18 ++++++ .../core/eventstore/SendResult.java | 57 +++++++++++++++++++ .../eventstore/kafka/KafkaEventProducer.java | 34 +++++++++++ 4 files changed, 128 insertions(+) create mode 100644 event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java create mode 100644 event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java b/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java index b6f2598..b65f499 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java @@ -29,6 +29,25 @@ public interface EventProducer { */ void send(K key, V value, long timestamp); + /** + * Sends data to underlying sink asynchronously with a callback to handle completion or errors. + * + * @param key message key + * @param value message value/payload + * @param callback callback to be invoked when the send completes or fails + */ + void send(K key, V value, SendCallback callback); + + /** + * Sends data to underlying sink asynchronously with a callback to handle completion or errors. + * + * @param key message key + * @param value message value/payload + * @param timestamp time stamp to be used for the event + * @param callback callback to be invoked when the send completes or fails + */ + void send(K key, V value, long timestamp, SendCallback callback); + /** * Sends data to underlying sink, async by default unless sync=true in the init configs * diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java new file mode 100644 index 0000000..048a5e6 --- /dev/null +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java @@ -0,0 +1,18 @@ +package org.hypertrace.core.eventstore; + +/** + * A callback interface that allows clients to receive notifications when a send operation + * completes. This can be used to handle both successful sends and errors asynchronously. + */ +public interface SendCallback { + + /** + * Called when a send operation completes. + * + * @param result The result of the send operation if successful, null if an exception occurred + * @param exception The exception thrown during the send operation, null if the send was + * successful + */ + void onCompletion(SendResult result, Exception exception); +} + diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java new file mode 100644 index 0000000..81af969 --- /dev/null +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java @@ -0,0 +1,57 @@ +package org.hypertrace.core.eventstore; + +/** + * Represents the result of a successful send operation. This class abstracts away the underlying + * implementation details (e.g., Kafka's RecordMetadata) and provides a common interface for + * clients. + */ +public class SendResult { + + private final String topic; + private final int partition; + private final long offset; + private final long timestamp; + + public SendResult(String topic, int partition, long offset, long timestamp) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + } + + /** Returns the topic the record was appended to. */ + public String getTopic() { + return topic; + } + + /** Returns the partition the record was sent to. */ + public int getPartition() { + return partition; + } + + /** Returns the offset of the record in the topic/partition. */ + public long getOffset() { + return offset; + } + + /** Returns the timestamp of the record in the topic/partition. */ + public long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return "SendResult{" + + "topic='" + + topic + + '\'' + + ", partition=" + + partition + + ", offset=" + + offset + + ", timestamp=" + + timestamp + + '}'; + } +} + diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java b/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java index 3b5d1e9..192976d 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java @@ -13,6 +13,8 @@ import org.hypertrace.core.eventstore.EventProducer; import org.hypertrace.core.eventstore.EventProducerConfig; import org.hypertrace.core.eventstore.KeyValuePair; +import org.hypertrace.core.eventstore.SendCallback; +import org.hypertrace.core.eventstore.SendResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,38 @@ public void send(K key, V value, long timestamp) { producer.send(new ProducerRecord<>(this.topic, null, timestamp, key, value)); } + @Override + public void send(K key, V value, SendCallback callback) { + producer.send( + new ProducerRecord<>(this.topic, key, value), + (metadata, exception) -> + callback.onCompletion( + exception == null + ? new SendResult( + metadata.topic(), + metadata.partition(), + metadata.offset(), + metadata.timestamp()) + : null, + exception)); + } + + @Override + public void send(K key, V value, long timestamp, SendCallback callback) { + producer.send( + new ProducerRecord<>(this.topic, null, timestamp, key, value), + (metadata, exception) -> + callback.onCompletion( + exception == null + ? new SendResult( + metadata.topic(), + metadata.partition(), + metadata.offset(), + metadata.timestamp()) + : null, + exception)); + } + @Override public void batchSend(List> events) { events.forEach(entry -> this.send(entry.getKey(), entry.getValue())); From 374572c91cee71b346e9e7c91a74b28a5cd80543 Mon Sep 17 00:00:00 2001 From: GurtejSohi Date: Mon, 29 Dec 2025 15:32:37 +0530 Subject: [PATCH 2/5] chore: dummy commit From b2c099cddd6b3826a9facbe9ce1d18e9bbc7ce1a Mon Sep 17 00:00:00 2001 From: Avinash <52672355+avinashkolluru@users.noreply.github.com> Date: Mon, 29 Dec 2025 18:56:16 +0530 Subject: [PATCH 3/5] Apply suggestion from @avinashkolluru --- .../src/main/java/org/hypertrace/core/eventstore/SendResult.java | 1 + 1 file changed, 1 insertion(+) diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java index 81af969..0b0187d 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java @@ -39,6 +39,7 @@ public long getTimestamp() { return timestamp; } + @Override public String toString() { return "SendResult{" From b8bdba63ff16f34dade1499fb3152a0c3e9b4ee4 Mon Sep 17 00:00:00 2001 From: Avinash <52672355+avinashkolluru@users.noreply.github.com> Date: Mon, 29 Dec 2025 18:59:01 +0530 Subject: [PATCH 4/5] Apply suggestion from @avinashkolluru --- .../src/main/java/org/hypertrace/core/eventstore/SendResult.java | 1 - 1 file changed, 1 deletion(-) diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java index 0b0187d..81af969 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java @@ -39,7 +39,6 @@ public long getTimestamp() { return timestamp; } - @Override public String toString() { return "SendResult{" From 2fecdfd91be4445cf5bff92c61d713c62dcbf188 Mon Sep 17 00:00:00 2001 From: avinash Date: Mon, 29 Dec 2025 19:08:27 +0530 Subject: [PATCH 5/5] spotless apply --- .../main/java/org/hypertrace/core/eventstore/SendCallback.java | 1 - .../src/main/java/org/hypertrace/core/eventstore/SendResult.java | 1 - 2 files changed, 2 deletions(-) diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java index 048a5e6..d3f1d7a 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java @@ -15,4 +15,3 @@ public interface SendCallback { */ void onCompletion(SendResult result, Exception exception); } - diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java index 81af969..fc9b60c 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java @@ -54,4 +54,3 @@ public String toString() { + '}'; } } -