Skip to content

Commit 7ee0b08

Browse files
authored
Merge pull request #481 from alex268/update_commit_offsets
Added api for session id of topic readers
2 parents 600c8dc + 39931b3 commit 7ee0b08

File tree

6 files changed

+58
-1
lines changed

6 files changed

+58
-1
lines changed

topic/src/main/java/tech/ydb/topic/read/SyncReader.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,18 @@ public interface SyncReader {
2222
void initAndWait();
2323

2424
/**
25-
* Receive a {@link Message}. Blocks until a Message is received.
25+
* Return identifier of read session.
26+
* @return current read session identifier or null if session has not started yet
27+
*/
28+
@Nullable
29+
String getSessionId();
30+
31+
/**
32+
* Receive a {@link Message}.Blocks until a Message is received.
2633
*
2734
* @param settings settings for receiving a Message
2835
* @return returns a {@link Message}, or null if the specified timeout time elapses before a message is available
36+
* @throws java.lang.InterruptedException if current thread was interrupted
2937
*/
3038
Message receive(ReceiveSettings settings) throws InterruptedException;
3139

@@ -35,6 +43,7 @@ public interface SyncReader {
3543
* @param timeout timeout to wait a Message with
3644
* @param unit TimeUnit for timeout
3745
* @return returns a {@link Message}, or null if the specified waiting time elapses before a message is available
46+
* @throws java.lang.InterruptedException if current thread was interrupted
3847
*/
3948
@Nullable
4049
default Message receive(long timeout, TimeUnit unit) throws InterruptedException {
@@ -47,6 +56,7 @@ default Message receive(long timeout, TimeUnit unit) throws InterruptedException
4756
* Receive a {@link Message}. Blocks until a Message is received.
4857
*
4958
* @return {@link Message}
59+
* @throws java.lang.InterruptedException if current thread was interrupted
5060
*/
5161
default Message receive() throws InterruptedException {
5262
return receive(ReceiveSettings.newBuilder().build());

topic/src/main/java/tech/ydb/topic/read/events/ReadEventHandler.java

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package tech.ydb.topic.read.events;
22

3+
import tech.ydb.topic.read.impl.events.SessionStartedEvent;
4+
35
/**
46
* @author Nikolay Perfilov
57
*/
@@ -22,4 +24,6 @@ default void onStopPartitionSession(StopPartitionSessionEvent event) {
2224
default void onPartitionSessionClosed(PartitionSessionClosedEvent event) { }
2325

2426
default void onReaderClosed(ReaderClosedEvent event) { }
27+
28+
default void onSessionStarted(SessionStartedEvent event) { }
2529
}

topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java

+13
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import tech.ydb.topic.read.events.StopPartitionSessionEvent;
2828
import tech.ydb.topic.read.impl.events.CommitOffsetAcknowledgementEventImpl;
2929
import tech.ydb.topic.read.impl.events.PartitionSessionClosedEventImpl;
30+
import tech.ydb.topic.read.impl.events.SessionStartedEvent;
3031
import tech.ydb.topic.read.impl.events.StartPartitionSessionEventImpl;
3132
import tech.ydb.topic.read.impl.events.StopPartitionSessionEventImpl;
3233
import tech.ydb.topic.settings.ReadEventHandlersSettings;
@@ -76,6 +77,18 @@ public CompletableFuture<Status> updateOffsetsInTransaction(YdbTransaction trans
7677
return sendUpdateOffsetsInTransaction(transaction, offsets, settings);
7778
}
7879

80+
@Override
81+
protected void handleSessionStarted(String sessionId) {
82+
handlerExecutor.execute(() -> {
83+
try {
84+
eventHandler.onSessionStarted(new SessionStartedEvent(sessionId));
85+
} catch (Throwable th) {
86+
logUserThrowableAndStopWorking(th, "onSessionStarted");
87+
throw th;
88+
}
89+
});
90+
}
91+
7992
@Override
8093
protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event) {
8194
return CompletableFuture.runAsync(() -> {

topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ protected CompletableFuture<Void> initImpl() {
109109
}
110110

111111
protected abstract CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent event);
112+
protected abstract void handleSessionStarted(String sessionId);
112113
protected abstract void handleCommitResponse(long committedOffset, PartitionSession partitionSession);
113114
protected abstract void handleStartPartitionSessionRequest(
114115
YdbTopic.StreamReadMessage.StartPartitionSessionRequest request,
@@ -385,6 +386,7 @@ private void onInitResponse(YdbTopic.StreamReadMessage.InitResponse response) {
385386
sizeBytesToRequest.set(settings.getMaxMemoryUsageBytes());
386387
logger.info("[{}] Session {} initialized. Requesting {} bytes...", streamId, sessionId,
387388
settings.getMaxMemoryUsageBytes());
389+
handleSessionStarted(sessionId);
388390
sendReadRequest();
389391
}
390392

topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java

+11
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class SyncReaderImpl extends ReaderImpl implements SyncReader {
3939
private final ReentrantLock queueLock = new ReentrantLock();
4040
private final Condition queueIsNotEmptyCondition = queueLock.newCondition();
4141
private int currentMessageIndex = 0;
42+
private volatile String sessionId = null;
4243

4344
public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) {
4445
super(topicRpc, settings);
@@ -54,6 +55,11 @@ private MessageBatchWrapper(List<Message> messages, CompletableFuture<Void> futu
5455
}
5556
}
5657

58+
@Override
59+
public String getSessionId() {
60+
return sessionId;
61+
}
62+
5763
@Override
5864
public void init() {
5965
initImpl();
@@ -160,6 +166,11 @@ protected CompletableFuture<Void> handleDataReceivedEvent(DataReceivedEvent even
160166
return resultFuture;
161167
}
162168

169+
@Override
170+
protected void handleSessionStarted(String sessionId) {
171+
this.sessionId = sessionId;
172+
}
173+
163174
@Override
164175
protected void handleCommitResponse(long committedOffset, PartitionSession partitionSession) {
165176
if (logger.isDebugEnabled()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package tech.ydb.topic.read.impl.events;
2+
3+
/**
4+
*
5+
* @author Aleksandr Gorshenin
6+
*/
7+
public class SessionStartedEvent {
8+
private final String sessionId;
9+
10+
public SessionStartedEvent(String sessionId) {
11+
this.sessionId = sessionId;
12+
}
13+
14+
public String getSessionId() {
15+
return this.sessionId;
16+
}
17+
}

0 commit comments

Comments
 (0)