Skip to content

Added api for session id of topic readers #481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion topic/src/main/java/tech/ydb/topic/read/SyncReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ public interface SyncReader {
void initAndWait();

/**
* Receive a {@link Message}. Blocks until a Message is received.
* Return identifier of read session.
* @return current read session identifier or null if session has not started yet
*/
@Nullable
String getSessionId();

/**
* Receive a {@link Message}.Blocks until a Message is received.
*
* @param settings settings for receiving a Message
* @return returns a {@link Message}, or null if the specified timeout time elapses before a message is available
* @throws java.lang.InterruptedException if current thread was interrupted
*/
Message receive(ReceiveSettings settings) throws InterruptedException;

Expand All @@ -35,6 +43,7 @@ public interface SyncReader {
* @param timeout timeout to wait a Message with
* @param unit TimeUnit for timeout
* @return returns a {@link Message}, or null if the specified waiting time elapses before a message is available
* @throws java.lang.InterruptedException if current thread was interrupted
*/
@Nullable
default Message receive(long timeout, TimeUnit unit) throws InterruptedException {
Expand All @@ -47,6 +56,7 @@ default Message receive(long timeout, TimeUnit unit) throws InterruptedException
* Receive a {@link Message}. Blocks until a Message is received.
*
* @return {@link Message}
* @throws java.lang.InterruptedException if current thread was interrupted
*/
default Message receive() throws InterruptedException {
return receive(ReceiveSettings.newBuilder().build());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package tech.ydb.topic.read.events;

import tech.ydb.topic.read.impl.events.SessionStartedEvent;

/**
* @author Nikolay Perfilov
*/
Expand All @@ -22,4 +24,6 @@ default void onStopPartitionSession(StopPartitionSessionEvent event) {
default void onPartitionSessionClosed(PartitionSessionClosedEvent event) { }

default void onReaderClosed(ReaderClosedEvent event) { }

default void onSessionStarted(SessionStartedEvent event) { }
}
13 changes: 13 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tech.ydb.topic.read.events.StopPartitionSessionEvent;
import tech.ydb.topic.read.impl.events.CommitOffsetAcknowledgementEventImpl;
import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl;
import tech.ydb.topic.read.impl.events.SessionStartedEvent;
import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl;
import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
Expand Down Expand Up @@ -76,6 +77,18 @@ public CompletableFuture<Status> updateOffsetsInTransaction(YdbTransaction trans
return sendUpdateOffsetsInTransaction(transaction, offsets, settings);
}

@Override
protected void handleSessionStarted(String sessionId) {
handlerExecutor.execute(() -> {
try {
eventHandler.onSessionStarted(new SessionStartedEvent(sessionId));
} catch (Throwable th) {
logUserThrowableAndStopWorking(th, "onSessionStarted");
throw th;
}
});
}

@Override
protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event) {
return CompletableFuture.runAsync(() -> {
Expand Down
2 changes: 2 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ protected CompletableFuture<Void> initImpl() {
}

protected abstract CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event);
protected abstract void handleSessionStarted(String sessionId);
protected abstract void handleCommitResponse(long committedOffset, PartitionSession partitionSession);
protected abstract void handleStartPartitionSessionRequest(
YdbTopic.StreamReadMessage.StartPartitionSessionRequest request,
Expand Down Expand Up @@ -385,6 +386,7 @@ private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) {
sizeBytesToRequest.set(settings.getMaxMemoryUsageBytes());
logger.info("[{}] Session {} initialized. Requesting {} bytes...", streamId, sessionId,
settings.getMaxMemoryUsageBytes());
handleSessionStarted(sessionId);
sendReadRequest();
}

Expand Down
11 changes: 11 additions & 0 deletions topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class SyncReaderImpl extends ReaderImpl implements SyncReader {
private final ReentrantLock queueLock = new ReentrantLock();
private final Condition queueIsNotEmptyCondition = queueLock.newCondition();
private int currentMessageIndex = 0;
private volatile String sessionId = null;

public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
super(topicRpc, settings);
Expand All @@ -54,6 +55,11 @@ private MessageBatchWrapper(List<Message> messages, CompletableFuture<Void> futu
}
}

@Override
public String getSessionId() {
return sessionId;
}

@Override
public void init() {
initImpl();
Expand Down Expand Up @@ -160,6 +166,11 @@ protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent even
return resultFuture;
}

@Override
protected void handleSessionStarted(String sessionId) {
this.sessionId = sessionId;
}

@Override
protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package tech.ydb.topic.read.impl.events;

/**
*
* @author Aleksandr Gorshenin
*/
public class SessionStartedEvent {
private final String sessionId;

public SessionStartedEvent(String sessionId) {
this.sessionId = sessionId;
}

public String getSessionId() {
return this.sessionId;
}
}