diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java b/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java index b6f2598..b65f499 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/EventProducer.java @@ -29,6 +29,25 @@ public interface EventProducer { */ void send(K key, V value, long timestamp); + /** + * Sends data to underlying sink asynchronously with a callback to handle completion or errors. + * + * @param key message key + * @param value message value/payload + * @param callback callback to be invoked when the send completes or fails + */ + void send(K key, V value, SendCallback callback); + + /** + * Sends data to underlying sink asynchronously with a callback to handle completion or errors. + * + * @param key message key + * @param value message value/payload + * @param timestamp time stamp to be used for the event + * @param callback callback to be invoked when the send completes or fails + */ + void send(K key, V value, long timestamp, SendCallback callback); + /** * Sends data to underlying sink, async by default unless sync=true in the init configs * diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java new file mode 100644 index 0000000..d3f1d7a --- /dev/null +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendCallback.java @@ -0,0 +1,17 @@ +package org.hypertrace.core.eventstore; + +/** + * A callback interface that allows clients to receive notifications when a send operation + * completes. This can be used to handle both successful sends and errors asynchronously. + */ +public interface SendCallback { + + /** + * Called when a send operation completes. + * + * @param result The result of the send operation if successful, null if an exception occurred + * @param exception The exception thrown during the send operation, null if the send was + * successful + */ + void onCompletion(SendResult result, Exception exception); +} diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java new file mode 100644 index 0000000..fc9b60c --- /dev/null +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/SendResult.java @@ -0,0 +1,56 @@ +package org.hypertrace.core.eventstore; + +/** + * Represents the result of a successful send operation. This class abstracts away the underlying + * implementation details (e.g., Kafka's RecordMetadata) and provides a common interface for + * clients. + */ +public class SendResult { + + private final String topic; + private final int partition; + private final long offset; + private final long timestamp; + + public SendResult(String topic, int partition, long offset, long timestamp) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + } + + /** Returns the topic the record was appended to. */ + public String getTopic() { + return topic; + } + + /** Returns the partition the record was sent to. */ + public int getPartition() { + return partition; + } + + /** Returns the offset of the record in the topic/partition. */ + public long getOffset() { + return offset; + } + + /** Returns the timestamp of the record in the topic/partition. */ + public long getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return "SendResult{" + + "topic='" + + topic + + '\'' + + ", partition=" + + partition + + ", offset=" + + offset + + ", timestamp=" + + timestamp + + '}'; + } +} diff --git a/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java b/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java index 3b5d1e9..192976d 100644 --- a/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java +++ b/event-store/src/main/java/org/hypertrace/core/eventstore/kafka/KafkaEventProducer.java @@ -13,6 +13,8 @@ import org.hypertrace.core.eventstore.EventProducer; import org.hypertrace.core.eventstore.EventProducerConfig; import org.hypertrace.core.eventstore.KeyValuePair; +import org.hypertrace.core.eventstore.SendCallback; +import org.hypertrace.core.eventstore.SendResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,38 @@ public void send(K key, V value, long timestamp) { producer.send(new ProducerRecord<>(this.topic, null, timestamp, key, value)); } + @Override + public void send(K key, V value, SendCallback callback) { + producer.send( + new ProducerRecord<>(this.topic, key, value), + (metadata, exception) -> + callback.onCompletion( + exception == null + ? new SendResult( + metadata.topic(), + metadata.partition(), + metadata.offset(), + metadata.timestamp()) + : null, + exception)); + } + + @Override + public void send(K key, V value, long timestamp, SendCallback callback) { + producer.send( + new ProducerRecord<>(this.topic, null, timestamp, key, value), + (metadata, exception) -> + callback.onCompletion( + exception == null + ? new SendResult( + metadata.topic(), + metadata.partition(), + metadata.offset(), + metadata.timestamp()) + : null, + exception)); + } + @Override public void batchSend(List> events) { events.forEach(entry -> this.send(entry.getKey(), entry.getValue()));