Skip to content

Commit

Permalink
Fixed Messages Paging
Browse files Browse the repository at this point in the history
  • Loading branch information
marian-hacaj-profinit committed Jun 19, 2023
1 parent 2ac8646 commit 18de1d7
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
SeekTypeDTO seekType,
List<String> seekTo,
Integer limit,
Integer page,
String q,
MessageFilterTypeDTO filterQueryType,
SeekDirectionDTO seekDirection,
Expand All @@ -85,6 +86,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
.topicActions(MESSAGES_READ)
.build());

page = page != null ? page : 0;
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
Expand All @@ -98,7 +100,7 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
limit, page, seekDirection, keySerde, valueSerde)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,8 @@ protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sin
messagesProcessing.sendFinishEvent(sink);
sink.complete();
}

protected Integer getPageOffset() {
return messagesProcessing.getPageOffset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
return; //fast return in case of sink cancellation
}
long beginOffset = seekOperations.getBeginOffsets().get(tp);
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition - this.getPageOffset());
readToOffset = readToOffset - this.getPageOffset();


partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
.forEach(r -> sendMessage(sink, r));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();

seekOperations.getOffsetsForSeek().forEach((p, o) -> {
consumer.seek(p, o + getPageOffset());
});

EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,26 @@ public class MessagesProcessing {
private final Predicate<TopicMessageDTO> filter;
private final @Nullable Integer limit;

private final @Nullable Integer page;

public MessagesProcessing(ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
@Nullable Integer limit) {
@Nullable Integer limit,
@Nullable Integer page) {
this.deserializer = deserializer;
this.filter = filter;
this.limit = limit;
this.page = page;
}

boolean limitReached() {
return limit != null && sentMessages >= limit;
}

Integer getPageOffset() {
return this.limit * this.page;
}

void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
if (!sink.isCancelled() && !limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topi
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
@Nullable Integer pageSize,
Integer page,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
return withExistingTopic(cluster, topic)
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
filterQueryType, fixPageSize(pageSize), page,
seekDirection, keySerde, valueSerde));
}

private int fixPageSize(@Nullable Integer pageSize) {
Expand All @@ -182,6 +184,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
int limit,
int page,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
Expand All @@ -191,7 +194,8 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
var processing = new MessagesProcessing(
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
getMsgFilter(query, filterQueryType),
seekDirection == SeekDirectionDTO.TAILING ? null : limit
seekDirection == SeekDirectionDTO.TAILING ? null : limit,
page
);

if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private Flux<TopicMessageEventDTO> createTailingFlux(
query,
MessageFilterTypeDTO.STRING_CONTAINS,
0,
0,
SeekDirectionDTO.TAILING,
"String",
"String");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void sendMessageReturnsExceptionWhenTopicNotFound() {
@Test
void loadMessagesReturnsExceptionWhenTopicNotFound() {
StepVerifier.create(messagesService
.loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String"))
.loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, 0, null, "String", "String"))
.expectError(TopicNotFoundException.class)
.verify();
}
Expand All @@ -75,6 +75,7 @@ void maskingAppliedOnConfiguredClusters() throws Exception {
null,
null,
100,
0,
SeekDirectionDTO.FORWARD,
StringSerde.name(),
StringSerde.name()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ public void doAssert(Consumer<TopicMessageDTO> msgAssert) {
null,
null,
1,
0,
SeekDirectionDTO.FORWARD,
msgToSend.getKeySerde().get(),
msgToSend.getValueSerde().get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,10 @@ paths:
in: query
schema:
type: integer
- name: page
in: query
schema:
type: integer
- name: q
in: query
schema:
Expand Down

0 comments on commit 18de1d7

Please sign in to comment.