diff --git a/.github/workflows/e2e-automation.yml b/.github/workflows/e2e-automation.yml index b3bb2f266fc..5ad9c450214 100644 --- a/.github/workflows/e2e-automation.yml +++ b/.github/workflows/e2e-automation.yml @@ -23,12 +23,12 @@ jobs: - uses: actions/checkout@v3 with: ref: ${{ github.sha }} - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v3 - with: - aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - aws-region: eu-central-1 + # - name: Configure AWS credentials + # uses: aws-actions/configure-aws-credentials@v3 + # with: + # aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + # aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + # aws-region: eu-central-1 - name: Set up environment id: set_env_values run: | @@ -68,21 +68,21 @@ jobs: allure_report: allure-report subfolder: allure-results report_url: "http://kafkaui-allure-reports.s3-website.eu-central-1.amazonaws.com" - - uses: jakejarvis/s3-sync-action@master - if: always() - env: - AWS_S3_BUCKET: 'kafkaui-allure-reports' - AWS_REGION: 'eu-central-1' - SOURCE_DIR: 'allure-history/allure-results' - - name: Deploy report to Amazon S3 - if: always() - uses: Sibz/github-status-action@v1.1.6 - with: - authToken: ${{secrets.GITHUB_TOKEN}} - context: "Click Details button to open Allure report" - state: "success" - sha: ${{ github.sha }} - target_url: http://kafkaui-allure-reports.s3-website.eu-central-1.amazonaws.com/${{ github.run_number }} + # - uses: jakejarvis/s3-sync-action@master + # if: always() + # env: + # AWS_S3_BUCKET: 'kafkaui-allure-reports' + # AWS_REGION: 'eu-central-1' + # SOURCE_DIR: 'allure-history/allure-results' + # - name: Deploy report to Amazon S3 + # if: always() + # uses: Sibz/github-status-action@v1.1.6 + # with: + # authToken: ${{secrets.GITHUB_TOKEN}} + # context: "Click Details button to open Allure report" + # state: "success" + # sha: ${{ github.sha }} + # target_url: http://kafkaui-allure-reports.s3-website.eu-central-1.amazonaws.com/${{ github.run_number }} - name: Dump Docker logs on failure if: failure() uses: jwalton/gh-docker-logs@v2.2.1 diff --git a/.github/workflows/frontend.yaml b/.github/workflows/frontend.yaml index 9d7300448c9..daae8d83e8c 100644 --- a/.github/workflows/frontend.yaml +++ b/.github/workflows/frontend.yaml @@ -48,11 +48,11 @@ jobs: run: | cd kafka-ui-react-app/ pnpm test:CI - - name: SonarCloud Scan - uses: sonarsource/sonarcloud-github-action@master - with: - projectBaseDir: ./kafka-ui-react-app - args: -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} -Dsonar.pullrequest.branch=${{ github.head_ref }} -Dsonar.pullrequest.base=${{ github.base_ref }} - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - SONAR_TOKEN: ${{ secrets.SONAR_TOKEN_FRONTEND }} + # - name: SonarCloud Scan + # uses: sonarsource/sonarcloud-github-action@master + # with: + # projectBaseDir: ./kafka-ui-react-app + # args: -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} -Dsonar.pullrequest.branch=${{ github.head_ref }} -Dsonar.pullrequest.base=${{ github.base_ref }} + # env: + # GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # SONAR_TOKEN: ${{ secrets.SONAR_TOKEN_FRONTEND }} diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index 1e0f952eb16..46d9b47a4a6 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -91,7 +91,7 @@ software.amazon.msk aws-msk-iam-auth - 1.1.7 + 2.0.3 @@ -485,7 +485,7 @@ true - provectuslabs/kafka-ui:${git.revision} + gimral/kafka-ui:${git.revision} ${project.basedir} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java index 32d341e6134..709efcdc185 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java @@ -5,13 +5,14 @@ import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ; import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY; import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE; -import static java.util.stream.Collectors.toMap; import com.provectus.kafka.ui.api.MessagesApi; -import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; +import com.provectus.kafka.ui.model.MessageFilterIdDTO; +import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO; import com.provectus.kafka.ui.model.MessageFilterTypeDTO; +import com.provectus.kafka.ui.model.PollingModeDTO; import com.provectus.kafka.ui.model.SeekDirectionDTO; import com.provectus.kafka.ui.model.SeekTypeDTO; import com.provectus.kafka.ui.model.SerdeUsageDTO; @@ -25,14 +26,11 @@ import com.provectus.kafka.ui.service.DeserializationService; import com.provectus.kafka.ui.service.MessagesService; import java.util.List; -import java.util.Map; import java.util.Optional; -import javax.annotation.Nullable; import javax.validation.Valid; +import javax.validation.ValidationException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.kafka.common.TopicPartition; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; @@ -76,6 +74,7 @@ public Mono> executeSmartFilte .map(ResponseEntity::ok); } + @Deprecated @Override public Mono>> getTopicMessages(String clusterName, String topicName, @@ -88,6 +87,23 @@ public Mono>> getTopicMessages(String String keySerde, String valueSerde, ServerWebExchange exchange) { + throw new ValidationException("Not supported"); + } + + + @Override + public Mono>> getTopicMessagesV2(String clusterName, String topicName, + PollingModeDTO mode, + List partitions, + Integer limit, + String stringFilter, + String smartFilterId, + Long offset, + Long timestamp, + String keySerde, + String valueSerde, + String cursor, + ServerWebExchange exchange) { var contextBuilder = AccessContext.builder() .cluster(clusterName) .topic(topicName) @@ -98,27 +114,26 @@ public Mono>> getTopicMessages(String contextBuilder.auditActions(AuditAction.VIEW); } - seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING; - seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD; - filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS; - - var positions = new ConsumerPosition( - seekType, - topicName, - parseSeekTo(topicName, seekType, seekTo) - ); - Mono>> job = Mono.just( - ResponseEntity.ok( - messagesService.loadMessages( - getCluster(clusterName), topicName, positions, q, filterQueryType, - limit, seekDirection, keySerde, valueSerde) - ) - ); - - var context = contextBuilder.build(); - return validateAccess(context) - .then(job) - .doOnEach(sig -> audit(context, sig)); + var accessContext = contextBuilder.build(); + + Flux messagesFlux; + if (cursor != null) { + messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor); + } else { + messagesFlux = messagesService.loadMessages( + getCluster(clusterName), + topicName, + ConsumerPosition.create(mode, topicName, partitions, timestamp, offset), + stringFilter, + smartFilterId, + limit, + keySerde, + valueSerde + ); + } + return accessControlService.validateAccess(accessContext) + .then(Mono.just(ResponseEntity.ok(messagesFlux))) + .doOnEach(sig -> auditService.audit(accessContext, sig)); } @Override @@ -140,34 +155,6 @@ public Mono> sendTopicMessages( ).doOnEach(sig -> audit(context, sig)); } - /** - * The format is [partition]::[offset] for specifying offsets - * or [partition]::[timestamp in millis] for specifying timestamps. - */ - @Nullable - private Map parseSeekTo(String topic, SeekTypeDTO seekType, List seekTo) { - if (seekTo == null || seekTo.isEmpty()) { - if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) { - return null; - } - throw new ValidationException("seekTo should be set if seekType is " + seekType); - } - return seekTo.stream() - .map(p -> { - String[] split = p.split("::"); - if (split.length != 2) { - throw new IllegalArgumentException( - "Wrong seekTo argument format. See API docs for details"); - } - - return Pair.of( - new TopicPartition(topic, Integer.parseInt(split[0])), - Long.parseLong(split[1]) - ); - }) - .collect(toMap(Pair::getKey, Pair::getValue)); - } - @Override public Mono> getSerdes(String clusterName, String topicName, @@ -195,7 +182,20 @@ public Mono> getSerdes(String clusterNam ); } + @Override + public Mono> registerFilter(String clusterName, + String topicName, + Mono registration, + ServerWebExchange exchange) { + final Mono validateAccess = accessControlService.validateAccess(AccessContext.builder() + .cluster(clusterName) + .topic(topicName) + .topicActions(MESSAGES_READ) + .build()); - + return validateAccess.then(registration) + .map(reg -> messagesService.registerMessageFilter(reg.getFilterCode())) + .map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id))); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java index ec576a1d1a6..21ef0b43adb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/AbstractEmitter.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.emitter; import com.provectus.kafka.ui.model.TopicMessageEventDTO; +import jakarta.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.utils.Bytes; import reactor.core.publisher.FluxSink; @@ -21,12 +22,14 @@ protected PolledRecords poll(FluxSink sink, EnhancedConsum return records; } - protected boolean sendLimitReached() { + protected boolean isSendLimitReached() { return messagesProcessing.limitReached(); } - protected void send(FluxSink sink, Iterable> records) { - messagesProcessing.send(sink, records); + protected void send(FluxSink sink, + Iterable> records, + @Nullable Cursor.Tracking cursor) { + messagesProcessing.send(sink, records, cursor); } protected void sendPhase(FluxSink sink, String name) { @@ -37,8 +40,9 @@ protected void sendConsuming(FluxSink sink, PolledRecords messagesProcessing.sentConsumingInfo(sink, records); } - protected void sendFinishStatsAndCompleteSink(FluxSink sink) { - messagesProcessing.sendFinishEvent(sink); + // cursor is null if target partitions were fully polled (no, need to do paging) + protected void sendFinishStatsAndCompleteSink(FluxSink sink, @Nullable Cursor.Tracking cursor) { + messagesProcessing.sendFinishEvents(sink, cursor); sink.complete(); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java index cdc45336e46..75aa21bdf83 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/BackwardEmitter.java @@ -18,18 +18,15 @@ public BackwardEmitter(Supplier consumerSupplier, int messagesPerPage, ConsumerRecordDeserializer deserializer, Predicate filter, - PollingSettings pollingSettings) { + PollingSettings pollingSettings, + Cursor.Tracking cursor) { super( consumerSupplier, consumerPosition, messagesPerPage, - new MessagesProcessing( - deserializer, - filter, - false, - messagesPerPage - ), - pollingSettings + new MessagesProcessing(deserializer, filter, false, messagesPerPage), + pollingSettings, + cursor ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java index b0737e1cb9c..17b519434b4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ConsumingStats.java @@ -2,6 +2,8 @@ import com.provectus.kafka.ui.model.TopicMessageConsumingDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; +import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO; +import javax.annotation.Nullable; import reactor.core.publisher.FluxSink; class ConsumingStats { @@ -26,10 +28,15 @@ void incFilterApplyError() { filterApplyErrors++; } - void sendFinishEvent(FluxSink sink) { + void sendFinishEvent(FluxSink sink, @Nullable Cursor.Tracking cursor) { sink.next( new TopicMessageEventDTO() .type(TopicMessageEventDTO.TypeEnum.DONE) + .cursor( + cursor != null + ? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor()) + : null + ) .consuming(createConsumingStats()) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java new file mode 100644 index 00000000000..f0fd135bacf --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/Cursor.java @@ -0,0 +1,90 @@ +package com.provectus.kafka.ui.emitter; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.PollingModeDTO; +import com.provectus.kafka.ui.model.TopicMessageDTO; +import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.kafka.common.TopicPartition; + +public record Cursor(ConsumerRecordDeserializer deserializer, + ConsumerPosition consumerPosition, + Predicate filter, + int limit) { + + public static class Tracking { + private final ConsumerRecordDeserializer deserializer; + private final ConsumerPosition originalPosition; + private final Predicate filter; + private final int limit; + private final Function registerAction; + + //topic -> partition -> offset + private final Table trackingOffsets = HashBasedTable.create(); + + public Tracking(ConsumerRecordDeserializer deserializer, + ConsumerPosition originalPosition, + Predicate filter, + int limit, + Function registerAction) { + this.deserializer = deserializer; + this.originalPosition = originalPosition; + this.filter = filter; + this.limit = limit; + this.registerAction = registerAction; + } + + void trackOffset(String topic, int partition, long offset) { + trackingOffsets.put(topic, partition, offset); + } + + void initOffsets(Map initialSeekOffsets) { + initialSeekOffsets.forEach((tp, off) -> trackOffset(tp.topic(), tp.partition(), off)); + } + + private Map getOffsetsMap(int offsetToAdd) { + Map result = new HashMap<>(); + trackingOffsets.rowMap() + .forEach((topic, partsMap) -> + partsMap.forEach((p, off) -> result.put(new TopicPartition(topic, p), off + offsetToAdd))); + return result; + } + + String registerCursor() { + return registerAction.apply( + new Cursor( + deserializer, + new ConsumerPosition( + switch (originalPosition.pollingMode()) { + case TO_OFFSET, TO_TIMESTAMP, LATEST -> PollingModeDTO.TO_OFFSET; + case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> PollingModeDTO.FROM_OFFSET; + case TAILING -> throw new IllegalStateException(); + }, + originalPosition.topic(), + originalPosition.partitions(), + null, + new ConsumerPosition.Offsets( + null, + getOffsetsMap( + switch (originalPosition.pollingMode()) { + case TO_OFFSET, TO_TIMESTAMP, LATEST -> 0; + // when doing forward polling we need to start from latest msg's offset + 1 + case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> 1; + case TAILING -> throw new IllegalStateException(); + } + ) + ) + ), + filter, + limit + ) + ); + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java index 5c915fb2e8c..6627bc45c10 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/ForwardEmitter.java @@ -18,18 +18,15 @@ public ForwardEmitter(Supplier consumerSupplier, int messagesPerPage, ConsumerRecordDeserializer deserializer, Predicate filter, - PollingSettings pollingSettings) { + PollingSettings pollingSettings, + Cursor.Tracking cursor) { super( consumerSupplier, consumerPosition, messagesPerPage, - new MessagesProcessing( - deserializer, - filter, - true, - messagesPerPage - ), - pollingSettings + new MessagesProcessing(deserializer, filter, true, messagesPerPage), + pollingSettings, + cursor ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java index 6e9f8a8bbe3..325de97c055 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessageFilters.java @@ -1,7 +1,6 @@ package com.provectus.kafka.ui.emitter; import com.provectus.kafka.ui.exception.ValidationException; -import com.provectus.kafka.ui.model.MessageFilterTypeDTO; import com.provectus.kafka.ui.model.TopicMessageDTO; import groovy.json.JsonSlurper; import java.util.function.Predicate; @@ -22,23 +21,16 @@ public class MessageFilters { private MessageFilters() { } - public static Predicate createMsgFilter(String query, MessageFilterTypeDTO type) { - switch (type) { - case STRING_CONTAINS: - return containsStringFilter(query); - case GROOVY_SCRIPT: - return groovyScriptFilter(query); - default: - throw new IllegalStateException("Unknown query type: " + type); - } + public static Predicate noop() { + return e -> true; } - static Predicate containsStringFilter(String string) { - return msg -> StringUtils.contains(msg.getKey(), string) - || StringUtils.contains(msg.getContent(), string); + public static Predicate containsStringFilter(String string) { + return msg -> StringUtils.containsIgnoreCase(msg.getKey(), string) + || StringUtils.containsIgnoreCase(msg.getContent(), string); } - static Predicate groovyScriptFilter(String script) { + public static Predicate groovyScriptFilter(String script) { var engine = getGroovyEngine(); var compiledScript = compileScript(engine, script); var jsonSlurper = new JsonSlurper(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java index df8505a2e9a..8b8332e0398 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/MessagesProcessing.java @@ -39,7 +39,9 @@ boolean limitReached() { return limit != null && sentMessages >= limit; } - void send(FluxSink sink, Iterable> polled) { + void send(FluxSink sink, + Iterable> polled, + @Nullable Cursor.Tracking cursor) { sortForSending(polled, ascendingSortBeforeSend) .forEach(rec -> { if (!limitReached() && !sink.isCancelled()) { @@ -53,6 +55,9 @@ void send(FluxSink sink, Iterable sink, PolledRecords polled } } - void sendFinishEvent(FluxSink sink) { + void sendFinishEvents(FluxSink sink, @Nullable Cursor.Tracking cursor) { if (!sink.isCancelled()) { - consumingStats.sendFinishEvent(sink); + consumingStats.sendFinishEvent(sink, cursor); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java index 85802724178..b2b788929bc 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/OffsetsInfo.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.emitter; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import java.util.Collection; import java.util.HashSet; import java.util.Map; @@ -61,4 +62,8 @@ long summaryOffsetsRange() { return cnt.getValue(); } + Set allTargetPartitions() { + return Sets.union(nonEmptyPartitions, emptyPartitions); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java index bc6bd95d5f6..94169f1b634 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/PolledRecords.java @@ -3,6 +3,7 @@ import java.time.Duration; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; @@ -32,6 +33,10 @@ public Iterator> iterator() { return records.iterator(); } + public Set partitions() { + return records.partitions(); + } + private static int calculatePolledRecSize(Iterable> recs) { int polledBytes = 0; for (ConsumerRecord rec : recs) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java index af6dc7d0693..8abcd4772e4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/RangePollingEmitter.java @@ -17,6 +17,7 @@ abstract class RangePollingEmitter extends AbstractEmitter { private final Supplier consumerSupplier; + private final Cursor.Tracking cursor; protected final ConsumerPosition consumerPosition; protected final int messagesPerPage; @@ -24,11 +25,13 @@ protected RangePollingEmitter(Supplier consumerSupplier, ConsumerPosition consumerPosition, int messagesPerPage, MessagesProcessing messagesProcessing, - PollingSettings pollingSettings) { + PollingSettings pollingSettings, + Cursor.Tracking cursor) { super(messagesProcessing, pollingSettings); this.consumerPosition = consumerPosition; this.messagesPerPage = messagesPerPage; this.consumerSupplier = consumerSupplier; + this.cursor = cursor; } protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) { @@ -46,18 +49,20 @@ public void accept(FluxSink sink) { try (EnhancedConsumer consumer = consumerSupplier.get()) { sendPhase(sink, "Consumer created"); var seekOperations = SeekOperations.create(consumer, consumerPosition); + cursor.initOffsets(seekOperations.getOffsetsForSeek()); + TreeMap pollRange = nextPollingRange(new TreeMap<>(), seekOperations); log.debug("Starting from offsets {}", pollRange); - while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) { + while (!sink.isCancelled() && !pollRange.isEmpty() && !isSendLimitReached()) { var polled = poll(consumer, sink, pollRange); - send(sink, polled); + send(sink, polled, cursor); pollRange = nextPollingRange(pollRange, seekOperations); } if (sink.isCancelled()) { log.debug("Polling finished due to sink cancellation"); } - sendFinishStatsAndCompleteSink(sink); + sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor); log.debug("Polling finished"); } catch (InterruptException kafkaInterruptException) { log.debug("Polling finished due to thread interruption"); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java index 4de027bdb23..8fa2cfeb0bb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/SeekOperations.java @@ -1,13 +1,13 @@ package com.provectus.kafka.ui.emitter; +import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.provectus.kafka.ui.model.ConsumerPosition; -import com.provectus.kafka.ui.model.SeekTypeDTO; +import com.provectus.kafka.ui.model.PollingModeDTO; import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; -import javax.annotation.Nullable; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.mutable.MutableLong; @@ -22,17 +22,11 @@ public class SeekOperations { private final Map offsetsForSeek; //only contains non-empty partitions! public static SeekOperations create(Consumer consumer, ConsumerPosition consumerPosition) { - OffsetsInfo offsetsInfo; - if (consumerPosition.getSeekTo() == null) { - offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic()); - } else { - offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getSeekTo().keySet()); - } - return new SeekOperations( - consumer, - offsetsInfo, - getOffsetsForSeek(consumer, offsetsInfo, consumerPosition.getSeekType(), consumerPosition.getSeekTo()) - ); + OffsetsInfo offsetsInfo = consumerPosition.partitions().isEmpty() + ? new OffsetsInfo(consumer, consumerPosition.topic()) + : new OffsetsInfo(consumer, consumerPosition.partitions()); + var offsetsToSeek = getOffsetsForSeek(consumer, offsetsInfo, consumerPosition); + return new SeekOperations(consumer, offsetsInfo, offsetsToSeek); } public void assignAndSeekNonEmptyPartitions() { @@ -75,27 +69,26 @@ public Map getOffsetsForSeek() { @VisibleForTesting static Map getOffsetsForSeek(Consumer consumer, OffsetsInfo offsetsInfo, - SeekTypeDTO seekType, - @Nullable Map seekTo) { - switch (seekType) { - case LATEST: - return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions()); - case BEGINNING: - return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions()); - case OFFSET: - Preconditions.checkNotNull(seekTo); - return fixOffsets(offsetsInfo, seekTo); - case TIMESTAMP: - Preconditions.checkNotNull(seekTo); - return offsetsForTimestamp(consumer, offsetsInfo, seekTo); - default: - throw new IllegalStateException(); - } + ConsumerPosition position) { + return switch (position.pollingMode()) { + case TAILING -> consumer.endOffsets(offsetsInfo.allTargetPartitions()); + case LATEST -> consumer.endOffsets(offsetsInfo.getNonEmptyPartitions()); + case EARLIEST -> consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions()); + case FROM_OFFSET, TO_OFFSET -> fixOffsets(offsetsInfo, requireNonNull(position.offsets())); + case FROM_TIMESTAMP, TO_TIMESTAMP -> + offsetsForTimestamp(consumer, position.pollingMode(), offsetsInfo, requireNonNull(position.timestamp())); + }; } - private static Map fixOffsets(OffsetsInfo offsetsInfo, Map offsets) { - offsets = new HashMap<>(offsets); - offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions()); + private static Map fixOffsets(OffsetsInfo offsetsInfo, + ConsumerPosition.Offsets positionOffset) { + var offsets = new HashMap(); + if (positionOffset.offset() != null) { + offsetsInfo.getNonEmptyPartitions().forEach(tp -> offsets.put(tp, positionOffset.offset())); + } else { + offsets.putAll(requireNonNull(positionOffset.tpOffsets())); + offsets.keySet().retainAll(offsetsInfo.getNonEmptyPartitions()); + } Map result = new HashMap<>(); offsets.forEach((tp, targetOffset) -> { @@ -112,13 +105,25 @@ private static Map fixOffsets(OffsetsInfo offsetsInfo, Map return result; } - private static Map offsetsForTimestamp(Consumer consumer, OffsetsInfo offsetsInfo, - Map timestamps) { - timestamps = new HashMap<>(timestamps); - timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions()); + private static Map offsetsForTimestamp(Consumer consumer, + PollingModeDTO pollingMode, + OffsetsInfo offsetsInfo, + Long timestamp) { + Map timestamps = new HashMap<>(); + offsetsInfo.getNonEmptyPartitions().forEach(tp -> timestamps.put(tp, timestamp)); - return consumer.offsetsForTimes(timestamps).entrySet().stream() - .filter(e -> e.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + Map result = new HashMap<>(); + consumer.offsetsForTimes(timestamps).forEach((tp, offsetAndTimestamp) -> { + if (offsetAndTimestamp == null) { + if (pollingMode == TO_TIMESTAMP && offsetsInfo.getNonEmptyPartitions().contains(tp)) { + // if no offset was returned this means that *all* timestamps are lower + // than target timestamp. Is case of TO_OFFSET mode we need to read from the ending of tp + result.put(tp, offsetsInfo.getEndOffsets().get(tp)); + } + } else { + result.put(tp, offsetAndTimestamp.offset()); + } + }); + return result; } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java index c3f04fe8cc2..dd73f743710 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter/TailingEmitter.java @@ -35,7 +35,7 @@ public void accept(FluxSink sink) { while (!sink.isCancelled()) { sendPhase(sink, "Polling"); var polled = poll(sink, consumer); - send(sink, polled); + send(sink, polled, null); } sink.complete(); log.debug("Tailing finished"); @@ -55,5 +55,4 @@ private void assignAndSeek(EnhancedConsumer consumer) { consumer.assign(seekOffsets.keySet()); seekOffsets.forEach(consumer::seek); } - } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java index 9d77923fbc6..51f4e51f7c6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ConsumerPosition.java @@ -1,14 +1,72 @@ package com.provectus.kafka.ui.model; +import com.google.common.base.Preconditions; +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; import javax.annotation.Nullable; -import lombok.Value; import org.apache.kafka.common.TopicPartition; -@Value -public class ConsumerPosition { - SeekTypeDTO seekType; - String topic; - @Nullable - Map seekTo; // null if positioning should apply to all tps +public record ConsumerPosition(PollingModeDTO pollingMode, + String topic, + List partitions, //all partitions if list is empty + @Nullable Long timestamp, + @Nullable Offsets offsets) { + + public record Offsets(@Nullable Long offset, //should be applied to all partitions + @Nullable Map tpOffsets) { + public Offsets { + // only one of properties should be set + Preconditions.checkArgument((offset == null && tpOffsets != null) || (offset != null && tpOffsets == null)); + } + } + + public static ConsumerPosition create(PollingModeDTO pollingMode, + String topic, + @Nullable List partitions, + @Nullable Long timestamp, + @Nullable Long offset) { + @Nullable var offsets = parseAndValidateOffsets(pollingMode, offset); + + var topicPartitions = Optional.ofNullable(partitions).orElse(List.of()) + .stream() + .map(p -> new TopicPartition(topic, p)) + .collect(Collectors.toList()); + + // if offsets are specified - inferring partitions list from there + topicPartitions = (offsets != null && offsets.tpOffsets() != null) + ? List.copyOf(offsets.tpOffsets().keySet()) + : topicPartitions; + + return new ConsumerPosition( + pollingMode, + topic, + topicPartitions, + validateTimestamp(pollingMode, timestamp), + offsets + ); + } + + private static Long validateTimestamp(PollingModeDTO pollingMode, @Nullable Long ts) { + if (pollingMode == PollingModeDTO.FROM_TIMESTAMP || pollingMode == PollingModeDTO.TO_TIMESTAMP) { + if (ts == null) { + throw new ValidationException("timestamp not provided for " + pollingMode); + } + } + return ts; + } + + private static Offsets parseAndValidateOffsets(PollingModeDTO pollingMode, + @Nullable Long offset) { + if (pollingMode == PollingModeDTO.FROM_OFFSET || pollingMode == PollingModeDTO.TO_OFFSET) { + if (offset == null) { + throw new ValidationException("offsets not provided for " + pollingMode); + } + return new Offsets(offset, null); + } + return null; + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java index 620bd840861..b14b885c56a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java @@ -1,8 +1,13 @@ package com.provectus.kafka.ui.service; +import com.google.common.base.Charsets; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.hash.Hashing; import com.google.common.util.concurrent.RateLimiter; import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.emitter.BackwardEmitter; +import com.provectus.kafka.ui.emitter.Cursor; import com.provectus.kafka.ui.emitter.ForwardEmitter; import com.provectus.kafka.ui.emitter.MessageFilters; import com.provectus.kafka.ui.emitter.TailingEmitter; @@ -11,12 +16,12 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.MessageFilterTypeDTO; -import com.provectus.kafka.ui.model.SeekDirectionDTO; +import com.provectus.kafka.ui.model.PollingModeDTO; import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO; import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO; import com.provectus.kafka.ui.model.TopicMessageDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; +import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; import com.provectus.kafka.ui.serdes.ProducerRecordCreator; import com.provectus.kafka.ui.util.SslPropertiesUtil; import java.time.Instant; @@ -27,12 +32,12 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.KafkaProducer; @@ -50,8 +55,11 @@ @Slf4j public class MessagesService { + private static final long SALT_FOR_HASHING = ThreadLocalRandom.current().nextLong(); + private static final int DEFAULT_MAX_PAGE_SIZE = 500; private static final int DEFAULT_PAGE_SIZE = 100; + // limiting UI messages rate to 20/sec in tailing mode private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20; @@ -61,6 +69,12 @@ public class MessagesService { private final int maxPageSize; private final int defaultPageSize; + private final Cache> registeredFilters = CacheBuilder.newBuilder() + .maximumSize(PollingCursorsStorage.MAX_SIZE) + .build(); + + private final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage(); + public MessagesService(AdminClientService adminClientService, DeserializationService deserializationService, ConsumerGroupService consumerGroupService, @@ -86,10 +100,7 @@ private Mono withExistingTopic(KafkaCluster cluster, String to public static SmartFilterTestExecutionResultDTO execSmartFilterTest(SmartFilterTestExecutionDTO execData) { Predicate predicate; try { - predicate = MessageFilters.createMsgFilter( - execData.getFilterCode(), - MessageFilterTypeDTO.GROOVY_SCRIPT - ); + predicate = MessageFilters.groovyScriptFilter(execData.getFilterCode()); } catch (Exception e) { log.info("Smart filter '{}' compilation error", execData.getFilterCode(), e); return new SmartFilterTestExecutionResultDTO() @@ -197,67 +208,103 @@ public static KafkaProducer createProducer(KafkaCluster cluster, return new KafkaProducer<>(properties); } - public Flux loadMessages(KafkaCluster cluster, String topic, + public Flux loadMessages(KafkaCluster cluster, + String topic, ConsumerPosition consumerPosition, - @Nullable String query, - MessageFilterTypeDTO filterQueryType, - @Nullable Integer pageSize, - SeekDirectionDTO seekDirection, + @Nullable String containsStringFilter, + @Nullable String filterId, + @Nullable Integer limit, @Nullable String keySerde, @Nullable String valueSerde) { + return loadMessages( + cluster, + topic, + deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde), + consumerPosition, + getMsgFilter(containsStringFilter, filterId), + fixPageSize(limit) + ); + } + + public Flux loadMessages(KafkaCluster cluster, String topic, String cursorId) { + Cursor cursor = cursorsStorage.getCursor(cursorId) + .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache.")); + return loadMessages( + cluster, + topic, + cursor.deserializer(), + cursor.consumerPosition(), + cursor.filter(), + cursor.limit() + ); + } + + private Flux loadMessages(KafkaCluster cluster, + String topic, + ConsumerRecordDeserializer deserializer, + ConsumerPosition consumerPosition, + Predicate filter, + int limit) { return withExistingTopic(cluster, topic) .flux() .publishOn(Schedulers.boundedElastic()) - .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, - filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde)); - } - - private int fixPageSize(@Nullable Integer pageSize) { - return Optional.ofNullable(pageSize) - .filter(ps -> ps > 0 && ps <= maxPageSize) - .orElse(defaultPageSize); + .flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit)); } private Flux loadMessagesImpl(KafkaCluster cluster, - String topic, + ConsumerRecordDeserializer deserializer, ConsumerPosition consumerPosition, - @Nullable String query, - MessageFilterTypeDTO filterQueryType, - int limit, - SeekDirectionDTO seekDirection, - @Nullable String keySerde, - @Nullable String valueSerde) { - - var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde); - var filter = getMsgFilter(query, filterQueryType); - var emitter = switch (seekDirection) { - case FORWARD -> new ForwardEmitter( + Predicate filter, + int limit) { + var emitter = switch (consumerPosition.pollingMode()) { + case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter( () -> consumerGroupService.createConsumer(cluster), - consumerPosition, limit, deserializer, filter, cluster.getPollingSettings() + consumerPosition, + limit, + deserializer, + filter, + cluster.getPollingSettings(), + cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit) ); - case BACKWARD -> new BackwardEmitter( + case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter( () -> consumerGroupService.createConsumer(cluster), - consumerPosition, limit, deserializer, filter, cluster.getPollingSettings() + consumerPosition, + limit, + deserializer, + filter, + cluster.getPollingSettings(), + cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit) ); case TAILING -> new TailingEmitter( () -> consumerGroupService.createConsumer(cluster), - consumerPosition, deserializer, filter, cluster.getPollingSettings() + consumerPosition, + deserializer, + filter, + cluster.getPollingSettings() ); }; return Flux.create(emitter) - .map(throttleUiPublish(seekDirection)); + .map(throttleUiPublish(consumerPosition.pollingMode())); } - private Predicate getMsgFilter(String query, - MessageFilterTypeDTO filterQueryType) { - if (StringUtils.isEmpty(query)) { - return evt -> true; + private Predicate getMsgFilter(@Nullable String containsStrFilter, + @Nullable String smartFilterId) { + Predicate messageFilter = MessageFilters.noop(); + if (containsStrFilter != null) { + messageFilter = messageFilter.and(MessageFilters.containsStringFilter(containsStrFilter)); } - return MessageFilters.createMsgFilter(query, filterQueryType); + if (smartFilterId != null) { + var registered = registeredFilters.getIfPresent(smartFilterId); + if (registered == null) { + throw new ValidationException("No filter was registered with id " + smartFilterId); + } + messageFilter = messageFilter.and(registered); + } + return messageFilter; } - private UnaryOperator throttleUiPublish(SeekDirectionDTO seekDirection) { - if (seekDirection == SeekDirectionDTO.TAILING) { + private UnaryOperator throttleUiPublish(PollingModeDTO pollingMode) { + if (pollingMode == PollingModeDTO.TAILING) { RateLimiter rateLimiter = RateLimiter.create(TAILING_UI_MESSAGE_THROTTLE_RATE); return m -> { rateLimiter.acquire(1); @@ -269,4 +316,22 @@ private UnaryOperator throttleUiPublish(SeekDirectionDTO seekDirection) { return UnaryOperator.identity(); } + private int fixPageSize(@Nullable Integer pageSize) { + return Optional.ofNullable(pageSize) + .filter(ps -> ps > 0 && ps <= maxPageSize) + .orElse(defaultPageSize); + } + + public String registerMessageFilter(String groovyCode) { + String saltedCode = groovyCode + SALT_FOR_HASHING; + String filterId = Hashing.sha256() + .hashString(saltedCode, Charsets.UTF_8) + .toString() + .substring(0, 8); + if (registeredFilters.getIfPresent(filterId) == null) { + registeredFilters.put(filterId, MessageFilters.groovyScriptFilter(groovyCode)); + } + return filterId; + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java new file mode 100644 index 00000000000..98094b5113b --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/PollingCursorsStorage.java @@ -0,0 +1,45 @@ +package com.provectus.kafka.ui.service; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.provectus.kafka.ui.emitter.Cursor; +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.TopicMessageDTO; +import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import org.apache.commons.lang3.RandomStringUtils; + +public class PollingCursorsStorage { + + public static final int MAX_SIZE = 10_000; + + private final Cache cursorsCache = CacheBuilder.newBuilder() + .maximumSize(MAX_SIZE) + .build(); + + + public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer, + ConsumerPosition originalPosition, + Predicate filter, + int limit) { + return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register); + } + + public Optional getCursor(String id) { + return Optional.ofNullable(cursorsCache.getIfPresent(id)); + } + + public String register(Cursor cursor) { + var id = RandomStringUtils.random(8, true, true); + cursorsCache.put(id, cursor); + return id; + } + + @VisibleForTesting + public Map asMap() { + return cursorsCache.asMap(); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java index 2523aae89ec..692c63109fa 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/analyze/TopicAnalysisService.java @@ -1,6 +1,6 @@ package com.provectus.kafka.ui.service.analyze; -import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING; +import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST; import com.provectus.kafka.ui.emitter.EnhancedConsumer; import com.provectus.kafka.ui.emitter.SeekOperations; @@ -14,6 +14,7 @@ import java.time.Duration; import java.time.Instant; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import lombok.RequiredArgsConstructor; @@ -104,7 +105,8 @@ public void run() { consumer.partitionsFor(topicId.topicName) .forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats())); - var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null)); + var seekOperations = + SeekOperations.create(consumer, new ConsumerPosition(EARLIEST, topicId.topicName, List.of(), null, null)); long summaryOffsetsRange = seekOperations.summaryOffsetsRange(); seekOperations.assignAndSeekNonEmptyPartitions(); diff --git a/kafka-ui-api/src/main/resources/application.yml b/kafka-ui-api/src/main/resources/application.yml index e8799206132..d4f7a018061 100644 --- a/kafka-ui-api/src/main/resources/application.yml +++ b/kafka-ui-api/src/main/resources/application.yml @@ -18,4 +18,17 @@ logging: com.provectus: DEBUG reactor.netty.http.server.AccessLog: INFO org.hibernate.validator: WARN + org.springframework.security: DEBUG +kafka: + clusters: + - name: local + bootstrapServers: localhost:9096 + schemaRegistry: http://localhost:8081 +# ksqldbServer: http://localhost:8088 +# kafkaConnect: +# - name: first +# address: http://localhost:8083 +# metrics: +# port: 9997 +# type: JMX diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java index ff11aa6656a..b925ea607f2 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java @@ -56,7 +56,7 @@ public void shouldDeleteRecords() { } long count = webTestClient.get() - .uri("/api/clusters/{clusterName}/topics/{topicName}/messages", LOCAL, topicName) + .uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?m=EARLIEST", LOCAL, topicName) .accept(TEXT_EVENT_STREAM) .exchange() .expectStatus() diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java new file mode 100644 index 00000000000..88be63fe67b --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/CursorTest.java @@ -0,0 +1,195 @@ +package com.provectus.kafka.ui.emitter; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.PollingModeDTO; +import com.provectus.kafka.ui.model.TopicMessageEventDTO; +import com.provectus.kafka.ui.producer.KafkaTestProducer; +import com.provectus.kafka.ui.serde.api.Serde; +import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer; +import com.provectus.kafka.ui.serdes.PropertyResolverImpl; +import com.provectus.kafka.ui.serdes.builtin.StringSerde; +import com.provectus.kafka.ui.service.PollingCursorsStorage; +import com.provectus.kafka.ui.util.ApplicationMetrics; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +class CursorTest extends AbstractIntegrationTest { + + static final String TOPIC = CursorTest.class.getSimpleName() + "_" + UUID.randomUUID(); + static final int MSGS_IN_PARTITION = 20; + static final int PAGE_SIZE = (MSGS_IN_PARTITION / 2) + 1; //to poll fill data set in 2 iterations + + final PollingCursorsStorage cursorsStorage = new PollingCursorsStorage(); + + @BeforeAll + static void setup() { + createTopic(new NewTopic(TOPIC, 1, (short) 1)); + try (var producer = KafkaTestProducer.forKafka(kafka)) { + for (int i = 0; i < MSGS_IN_PARTITION; i++) { + producer.send(new ProducerRecord<>(TOPIC, "msg_" + i)); + } + } + } + + @AfterAll + static void cleanup() { + deleteTopic(TOPIC); + } + + @Test + void backwardEmitter() { + var consumerPosition = new ConsumerPosition(PollingModeDTO.LATEST, TOPIC, List.of(), null, null); + var emitter = createBackwardEmitter(consumerPosition); + emitMessages(emitter, PAGE_SIZE); + var cursor = assertCursor( + PollingModeDTO.TO_OFFSET, + offsets -> assertThat(offsets) + .hasSize(1) + .containsEntry(new TopicPartition(TOPIC, 0), 9L) + ); + + // polling remaining records using registered cursor + emitter = createBackwardEmitterWithCursor(cursor); + emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE); + //checking no new cursors registered + assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor); + } + + @Test + void forwardEmitter() { + var consumerPosition = new ConsumerPosition(PollingModeDTO.EARLIEST, TOPIC, List.of(), null, null); + var emitter = createForwardEmitter(consumerPosition); + emitMessages(emitter, PAGE_SIZE); + var cursor = assertCursor( + PollingModeDTO.FROM_OFFSET, + offsets -> assertThat(offsets) + .hasSize(1) + .containsEntry(new TopicPartition(TOPIC, 0), 11L) + ); + + //polling remaining records using registered cursor + emitter = createForwardEmitterWithCursor(cursor); + emitMessages(emitter, MSGS_IN_PARTITION - PAGE_SIZE); + //checking no new cursors registered + assertThat(cursorsStorage.asMap()).hasSize(1).containsValue(cursor); + } + + private Cursor assertCursor(PollingModeDTO expectedMode, + Consumer> offsetsAssert) { + Cursor registeredCursor = cursorsStorage.asMap().values().stream().findFirst().orElse(null); + assertThat(registeredCursor).isNotNull(); + assertThat(registeredCursor.limit()).isEqualTo(PAGE_SIZE); + assertThat(registeredCursor.deserializer()).isNotNull(); + assertThat(registeredCursor.filter()).isNotNull(); + + var cursorPosition = registeredCursor.consumerPosition(); + assertThat(cursorPosition).isNotNull(); + assertThat(cursorPosition.topic()).isEqualTo(TOPIC); + assertThat(cursorPosition.partitions()).isEqualTo(List.of()); + assertThat(cursorPosition.pollingMode()).isEqualTo(expectedMode); + + offsetsAssert.accept(cursorPosition.offsets().tpOffsets()); + return registeredCursor; + } + + private void emitMessages(AbstractEmitter emitter, int expectedCnt) { + StepVerifier.create( + Flux.create(emitter) + .filter(e -> e.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) + .map(e -> e.getMessage().getContent()) + ) + .expectNextCount(expectedCnt) + .verifyComplete(); + } + + private BackwardEmitter createBackwardEmitter(ConsumerPosition position) { + return new BackwardEmitter( + this::createConsumer, + position, + PAGE_SIZE, + createRecordsDeserializer(), + m -> true, + PollingSettings.createDefault(), + createCursor(position) + ); + } + + private BackwardEmitter createBackwardEmitterWithCursor(Cursor cursor) { + return new BackwardEmitter( + this::createConsumer, + cursor.consumerPosition(), + cursor.limit(), + cursor.deserializer(), + cursor.filter(), + PollingSettings.createDefault(), + createCursor(cursor.consumerPosition()) + ); + } + + private ForwardEmitter createForwardEmitterWithCursor(Cursor cursor) { + return new ForwardEmitter( + this::createConsumer, + cursor.consumerPosition(), + cursor.limit(), + cursor.deserializer(), + cursor.filter(), + PollingSettings.createDefault(), + createCursor(cursor.consumerPosition()) + ); + } + + private ForwardEmitter createForwardEmitter(ConsumerPosition position) { + return new ForwardEmitter( + this::createConsumer, + position, + PAGE_SIZE, + createRecordsDeserializer(), + m -> true, + PollingSettings.createDefault(), + createCursor(position) + ); + } + + private Cursor.Tracking createCursor(ConsumerPosition position) { + return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE); + } + + private EnhancedConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, PAGE_SIZE - 1); // to check multiple polls + return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop()); + } + + private static ConsumerRecordDeserializer createRecordsDeserializer() { + Serde s = new StringSerde(); + s.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty()); + return new ConsumerRecordDeserializer( + StringSerde.name(), + s.deserializer(null, Serde.Target.KEY), + StringSerde.name(), + s.deserializer(null, Serde.Target.VALUE), + StringSerde.name(), + s.deserializer(null, Serde.Target.KEY), + s.deserializer(null, Serde.Target.VALUE), + msg -> msg + ); + } + +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java index 4e9f5034cd2..73264c5d8b4 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java @@ -51,7 +51,7 @@ void returnsFalseOtherwise() { filter.test(msg().key(null).content(null)) ); - assertFalse( + assertTrue( filter.test(msg().key("aBc").content("AbC")) ); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java index affa423123c..e288e77a113 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/SeekOperationsTest.java @@ -1,8 +1,13 @@ package com.provectus.kafka.ui.emitter; +import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST; +import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST; +import static com.provectus.kafka.ui.model.PollingModeDTO.TAILING; import static org.assertj.core.api.Assertions.assertThat; -import com.provectus.kafka.ui.model.SeekTypeDTO; +import com.provectus.kafka.ui.model.ConsumerPosition; +import com.provectus.kafka.ui.model.PollingModeDTO; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -14,6 +19,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; class SeekOperationsTest { @@ -40,13 +47,22 @@ void initMockConsumer() { @Nested class GetOffsetsForSeek { + @Test + void tailing() { + var offsets = SeekOperations.getOffsetsForSeek( + consumer, + new OffsetsInfo(consumer, topic), + new ConsumerPosition(TAILING, topic, List.of(), null, null) + ); + assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp0, 0L, tp1, 10L, tp2, 20L, tp3, 30L)); + } + @Test void latest() { var offsets = SeekOperations.getOffsetsForSeek( consumer, new OffsetsInfo(consumer, topic), - SeekTypeDTO.LATEST, - null + new ConsumerPosition(LATEST, topic, List.of(), null, null) ); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 30L)); } @@ -56,33 +72,38 @@ void beginning() { var offsets = SeekOperations.getOffsetsForSeek( consumer, new OffsetsInfo(consumer, topic), - SeekTypeDTO.BEGINNING, - null + new ConsumerPosition(EARLIEST, topic, List.of(), null, null) ); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 0L, tp3, 25L)); } - @Test - void offsets() { + @ParameterizedTest + @CsvSource({"TO_OFFSET", "FROM_OFFSET"}) + void offsets(PollingModeDTO mode) { var offsets = SeekOperations.getOffsetsForSeek( consumer, new OffsetsInfo(consumer, topic), - SeekTypeDTO.OFFSET, - Map.of(tp1, 10L, tp2, 10L, tp3, 26L) + new ConsumerPosition( + mode, topic, List.of(tp1, tp2, tp3), null, + new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 10L, tp3, 26L)) + ) ); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 10L, tp3, 26L)); } - @Test - void offsetsWithBoundsFixing() { + @ParameterizedTest + @CsvSource({"TO_OFFSET", "FROM_OFFSET"}) + void offsetsWithBoundsFixing(PollingModeDTO mode) { var offsets = SeekOperations.getOffsetsForSeek( consumer, new OffsetsInfo(consumer, topic), - SeekTypeDTO.OFFSET, - Map.of(tp1, 10L, tp2, 21L, tp3, 24L) + new ConsumerPosition( + mode, topic, List.of(tp1, tp2, tp3), null, + new ConsumerPosition.Offsets(null, Map.of(tp1, 10L, tp2, 21L, tp3, 24L)) + ) ); assertThat(offsets).containsExactlyInAnyOrderEntriesOf(Map.of(tp2, 20L, tp3, 25L)); } } -} \ No newline at end of file +} diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java index 2798bd213fe..972a573bab9 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/TailingEmitterTest.java @@ -4,10 +4,9 @@ import com.provectus.kafka.ui.AbstractIntegrationTest; import com.provectus.kafka.ui.model.ConsumerPosition; -import com.provectus.kafka.ui.model.MessageFilterTypeDTO; -import com.provectus.kafka.ui.model.SeekDirectionDTO; -import com.provectus.kafka.ui.model.SeekTypeDTO; +import com.provectus.kafka.ui.model.PollingModeDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; +import com.provectus.kafka.ui.serdes.builtin.StringSerde; import com.provectus.kafka.ui.service.ClustersStorage; import com.provectus.kafka.ui.service.MessagesService; import java.time.Duration; @@ -111,13 +110,12 @@ private Flux createTailingFlux( return applicationContext.getBean(MessagesService.class) .loadMessages(cluster, topicName, - new ConsumerPosition(SeekTypeDTO.LATEST, topic, null), + new ConsumerPosition(PollingModeDTO.TAILING, topic, List.of(), null, null), query, - MessageFilterTypeDTO.STRING_CONTAINS, + null, 0, - SeekDirectionDTO.TAILING, - "String", - "String"); + StringSerde.name(), + StringSerde.name()); } private List startTailing(String filterQuery) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java index cb50c0eb818..2f7d1868807 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/MessagesServiceTest.java @@ -8,19 +8,24 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.SeekDirectionDTO; -import com.provectus.kafka.ui.model.SeekTypeDTO; +import com.provectus.kafka.ui.model.PollingModeDTO; import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO; import com.provectus.kafka.ui.model.TopicMessageDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.producer.KafkaTestProducer; import com.provectus.kafka.ui.serdes.builtin.StringSerde; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.NewTopic; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.springframework.beans.factory.annotation.Autowired; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -35,6 +40,8 @@ class MessagesServiceTest extends AbstractIntegrationTest { KafkaCluster cluster; + Set createdTopics = new HashSet<>(); + @BeforeEach void init() { cluster = applicationContext @@ -43,6 +50,11 @@ void init() { .get(); } + @AfterEach + void deleteCreatedTopics() { + createdTopics.forEach(MessagesServiceTest::deleteTopic); + } + @Test void deleteTopicMessagesReturnsExceptionWhenTopicNotFound() { StepVerifier.create(messagesService.deleteTopicMessages(cluster, NON_EXISTING_TOPIC, List.of())) @@ -60,7 +72,9 @@ void sendMessageReturnsExceptionWhenTopicNotFound() { @Test void loadMessagesReturnsExceptionWhenTopicNotFound() { StepVerifier.create(messagesService - .loadMessages(cluster, NON_EXISTING_TOPIC, null, null, null, 1, null, "String", "String")) + .loadMessages(cluster, NON_EXISTING_TOPIC, + new ConsumerPosition(PollingModeDTO.TAILING, NON_EXISTING_TOPIC, List.of(), null, null), + null, null, 1, "String", "String")) .expectError(TopicNotFoundException.class) .verify(); } @@ -68,32 +82,84 @@ void loadMessagesReturnsExceptionWhenTopicNotFound() { @Test void maskingAppliedOnConfiguredClusters() throws Exception { String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID(); + createTopicWithCleanup(new NewTopic(testTopic, 1, (short) 1)); + try (var producer = KafkaTestProducer.forKafka(kafka)) { - createTopic(new NewTopic(testTopic, 1, (short) 1)); producer.send(testTopic, "message1"); producer.send(testTopic, "message2").get(); + } + + Flux msgsFlux = messagesService.loadMessages( + cluster, + testTopic, + new ConsumerPosition(PollingModeDTO.EARLIEST, testTopic, List.of(), null, null), + null, + null, + 100, + StringSerde.name(), + StringSerde.name() + ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) + .map(TopicMessageEventDTO::getMessage); + + // both messages should be masked + StepVerifier.create(msgsFlux) + .expectNextMatches(msg -> msg.getContent().equals("***")) + .expectNextMatches(msg -> msg.getContent().equals("***")) + .verifyComplete(); + } - Flux msgsFlux = messagesService.loadMessages( - cluster, - testTopic, - new ConsumerPosition(SeekTypeDTO.BEGINNING, testTopic, null), - null, - null, - 100, - SeekDirectionDTO.FORWARD, - StringSerde.name(), - StringSerde.name() - ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) - .map(TopicMessageEventDTO::getMessage); - - // both messages should be masked - StepVerifier.create(msgsFlux) - .expectNextMatches(msg -> msg.getContent().equals("***")) - .expectNextMatches(msg -> msg.getContent().equals("***")) - .verifyComplete(); - } finally { - deleteTopic(testTopic); + @ParameterizedTest + @CsvSource({"EARLIEST", "LATEST"}) + void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingModeDTO mode) { + String testTopic = MessagesServiceTest.class.getSimpleName() + UUID.randomUUID(); + createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1)); + + int msgsToGenerate = 100; + int pageSize = (msgsToGenerate / 2) + 1; + + try (var producer = KafkaTestProducer.forKafka(kafka)) { + for (int i = 0; i < msgsToGenerate; i++) { + producer.send(testTopic, "message_" + i); + } } + + var cursorIdCatcher = new AtomicReference(); + Flux msgsFlux = messagesService.loadMessages( + cluster, testTopic, + new ConsumerPosition(mode, testTopic, List.of(), null, null), + null, null, pageSize, StringSerde.name(), StringSerde.name()) + .doOnNext(evt -> { + if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) { + assertThat(evt.getCursor()).isNotNull(); + cursorIdCatcher.set(evt.getCursor().getId()); + } + }) + .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) + .map(evt -> evt.getMessage().getContent()); + + StepVerifier.create(msgsFlux) + .expectNextCount(pageSize) + .verifyComplete(); + + assertThat(cursorIdCatcher.get()).isNotNull(); + + Flux remainingMsgs = messagesService.loadMessages(cluster, testTopic, cursorIdCatcher.get()) + .doOnNext(evt -> { + if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) { + assertThat(evt.getCursor()).isNull(); + } + }) + .filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) + .map(evt -> evt.getMessage().getContent()); + + StepVerifier.create(remainingMsgs) + .expectNextCount(msgsToGenerate - pageSize) + .verifyComplete(); + } + + private void createTopicWithCleanup(NewTopic newTopic) { + createTopic(newTopic); + createdTopics.add(newTopic.name()); } @Test diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java index 2a9fa76f136..9c26e78f2a9 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/RecordEmitterTest.java @@ -1,13 +1,16 @@ package com.provectus.kafka.ui.service; -import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING; -import static com.provectus.kafka.ui.model.SeekTypeDTO.LATEST; -import static com.provectus.kafka.ui.model.SeekTypeDTO.OFFSET; -import static com.provectus.kafka.ui.model.SeekTypeDTO.TIMESTAMP; +import static com.provectus.kafka.ui.model.PollingModeDTO.EARLIEST; +import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_OFFSET; +import static com.provectus.kafka.ui.model.PollingModeDTO.FROM_TIMESTAMP; +import static com.provectus.kafka.ui.model.PollingModeDTO.LATEST; +import static com.provectus.kafka.ui.model.PollingModeDTO.TO_OFFSET; +import static com.provectus.kafka.ui.model.PollingModeDTO.TO_TIMESTAMP; import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.AbstractIntegrationTest; import com.provectus.kafka.ui.emitter.BackwardEmitter; +import com.provectus.kafka.ui.emitter.Cursor; import com.provectus.kafka.ui.emitter.EnhancedConsumer; import com.provectus.kafka.ui.emitter.ForwardEmitter; import com.provectus.kafka.ui.emitter.PollingSettings; @@ -43,6 +46,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.test.StepVerifier; @@ -57,16 +61,18 @@ class RecordEmitterTest extends AbstractIntegrationTest { static final String EMPTY_TOPIC = TOPIC + "_empty"; static final List SENT_RECORDS = new ArrayList<>(); static final ConsumerRecordDeserializer RECORD_DESERIALIZER = createRecordsDeserializer(); + static final Cursor.Tracking CURSOR_MOCK = Mockito.mock(Cursor.Tracking.class); static final Predicate NOOP_FILTER = m -> true; @BeforeAll static void generateMsgs() throws Exception { createTopic(new NewTopic(TOPIC, PARTITIONS, (short) 1)); createTopic(new NewTopic(EMPTY_TOPIC, PARTITIONS, (short) 1)); + long startTs = System.currentTimeMillis(); try (var producer = KafkaTestProducer.forKafka(kafka)) { for (int partition = 0; partition < PARTITIONS; partition++) { for (int i = 0; i < MSGS_PER_PARTITION; i++) { - long ts = System.currentTimeMillis() + i; + long ts = (startTs += 100); var value = "msg_" + partition + "_" + i; var metadata = producer.send( new ProducerRecord<>( @@ -115,20 +121,22 @@ private static ConsumerRecordDeserializer createRecordsDeserializer() { void pollNothingOnEmptyTopic() { var forwardEmitter = new ForwardEmitter( this::createConsumer, - new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null), + new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null), 100, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); var backwardEmitter = new BackwardEmitter( this::createConsumer, - new ConsumerPosition(BEGINNING, EMPTY_TOPIC, null), + new ConsumerPosition(EARLIEST, EMPTY_TOPIC, List.of(), null, null), 100, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); StepVerifier.create(Flux.create(forwardEmitter)) @@ -148,20 +156,22 @@ void pollNothingOnEmptyTopic() { void pollFullTopicFromBeginning() { var forwardEmitter = new ForwardEmitter( this::createConsumer, - new ConsumerPosition(BEGINNING, TOPIC, null), + new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); var backwardEmitter = new BackwardEmitter( this::createConsumer, - new ConsumerPosition(LATEST, TOPIC, null), + new ConsumerPosition(LATEST, TOPIC, List.of(), null, null), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); List expectedValues = SENT_RECORDS.stream().map(Record::getValue).collect(Collectors.toList()); @@ -180,20 +190,24 @@ void pollWithOffsets() { var forwardEmitter = new ForwardEmitter( this::createConsumer, - new ConsumerPosition(OFFSET, TOPIC, targetOffsets), + new ConsumerPosition(FROM_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null, + new ConsumerPosition.Offsets(null, targetOffsets)), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); var backwardEmitter = new BackwardEmitter( this::createConsumer, - new ConsumerPosition(OFFSET, TOPIC, targetOffsets), + new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null, + new ConsumerPosition.Offsets(null, targetOffsets)), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); var expectedValues = SENT_RECORDS.stream() @@ -213,50 +227,45 @@ void pollWithOffsets() { @Test void pollWithTimestamps() { - Map targetTimestamps = new HashMap<>(); - final Map> perPartition = - SENT_RECORDS.stream().collect(Collectors.groupingBy((r) -> r.tp)); - for (int i = 0; i < PARTITIONS; i++) { - final List records = perPartition.get(new TopicPartition(TOPIC, i)); - int randRecordIdx = ThreadLocalRandom.current().nextInt(records.size()); - log.info("partition: {} position: {}", i, randRecordIdx); - targetTimestamps.put( - new TopicPartition(TOPIC, i), - records.get(randRecordIdx).getTimestamp() - ); - } + var tsStats = SENT_RECORDS.stream().mapToLong(Record::getTimestamp).summaryStatistics(); + //choosing ts in the middle + long targetTimestamp = tsStats.getMin() + ((tsStats.getMax() - tsStats.getMin()) / 2); var forwardEmitter = new ForwardEmitter( this::createConsumer, - new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps), + new ConsumerPosition(FROM_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK + ); + + expectEmitter( + forwardEmitter, + SENT_RECORDS.stream() + .filter(r -> r.getTimestamp() >= targetTimestamp) + .map(Record::getValue) + .collect(Collectors.toList()) ); var backwardEmitter = new BackwardEmitter( this::createConsumer, - new ConsumerPosition(TIMESTAMP, TOPIC, targetTimestamps), + new ConsumerPosition(TO_TIMESTAMP, TOPIC, List.of(), targetTimestamp, null), PARTITIONS * MSGS_PER_PARTITION, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); - var expectedValues = SENT_RECORDS.stream() - .filter(r -> r.getTimestamp() >= targetTimestamps.get(r.getTp())) - .map(Record::getValue) - .collect(Collectors.toList()); - - expectEmitter(forwardEmitter, expectedValues); - - expectedValues = SENT_RECORDS.stream() - .filter(r -> r.getTimestamp() < targetTimestamps.get(r.getTp())) - .map(Record::getValue) - .collect(Collectors.toList()); - - expectEmitter(backwardEmitter, expectedValues); + expectEmitter( + backwardEmitter, + SENT_RECORDS.stream() + .filter(r -> r.getTimestamp() < targetTimestamp) + .map(Record::getValue) + .collect(Collectors.toList()) + ); } @Test @@ -269,11 +278,13 @@ void backwardEmitterSeekToEnd() { var backwardEmitter = new BackwardEmitter( this::createConsumer, - new ConsumerPosition(OFFSET, TOPIC, targetOffsets), + new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(targetOffsets.keySet()), null, + new ConsumerPosition.Offsets(null, targetOffsets)), numMessages, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); var expectedValues = SENT_RECORDS.stream() @@ -296,11 +307,13 @@ void backwardEmitterSeekToBegin() { var backwardEmitter = new BackwardEmitter( this::createConsumer, - new ConsumerPosition(OFFSET, TOPIC, offsets), + new ConsumerPosition(TO_OFFSET, TOPIC, List.copyOf(offsets.keySet()), null, + new ConsumerPosition.Offsets(null, offsets)), 100, RECORD_DESERIALIZER, NOOP_FILTER, - PollingSettings.createDefault() + PollingSettings.createDefault(), + CURSOR_MOCK ); expectEmitter(backwardEmitter, diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java index 78c111cdd19..a9639ca3893 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SendAndReadTests.java @@ -7,8 +7,7 @@ import com.provectus.kafka.ui.model.ConsumerPosition; import com.provectus.kafka.ui.model.CreateTopicMessageDTO; import com.provectus.kafka.ui.model.KafkaCluster; -import com.provectus.kafka.ui.model.SeekDirectionDTO; -import com.provectus.kafka.ui.model.SeekTypeDTO; +import com.provectus.kafka.ui.model.PollingModeDTO; import com.provectus.kafka.ui.model.TopicMessageDTO; import com.provectus.kafka.ui.model.TopicMessageEventDTO; import com.provectus.kafka.ui.serdes.builtin.Int32Serde; @@ -20,6 +19,7 @@ import io.confluent.kafka.schemaregistry.json.JsonSchema; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -500,15 +500,10 @@ public void doAssert(Consumer msgAssert) { TopicMessageDTO polled = messagesService.loadMessages( targetCluster, topic, - new ConsumerPosition( - SeekTypeDTO.BEGINNING, - topic, - Map.of(new TopicPartition(topic, 0), 0L) - ), + new ConsumerPosition(PollingModeDTO.EARLIEST, topic, List.of(), null, null), null, null, 1, - SeekDirectionDTO.FORWARD, msgToSend.getKeySerde().get(), msgToSend.getValueSerde().get() ).filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index ae51d31568f..2a2280d03e1 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -763,6 +763,119 @@ paths: 404: description: Not found + /api/clusters/{clusterName}/topics/{topicName}/smartfilters: + post: + tags: + - Messages + summary: registerFilter + operationId: registerFilter + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/MessageFilterRegistration' + responses: + 200: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/MessageFilterId' + + + /api/clusters/{clusterName}/topics/{topicName}/messages/v2: + get: + tags: + - Messages + summary: getTopicMessagesV2 + operationId: getTopicMessagesV2 + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + - name: mode + in: query + description: Messages polling mode + required: true + schema: + $ref: "#/components/schemas/PollingMode" + - name: partitions + in: query + schema: + type: array + description: List of target partitions (all partitions if not provided) + items: + type: integer + - name: limit + in: query + description: Max number of messages can be returned + schema: + type: integer + - name: stringFilter + in: query + description: query string to contains string filtration + schema: + type: string + - name: smartFilterId + in: query + description: filter id, that was registered beforehand + schema: + type: string + - name: offset + in: query + description: message offset to read from / to + schema: + type: integer + format: int64 + - name: timestamp + in: query + description: timestamp (in ms) to read from / to + schema: + type: integer + format: int64 + - name: keySerde + in: query + description: "Serde that should be used for deserialization. Will be chosen automatically if not set." + schema: + type: string + - name: valueSerde + in: query + description: "Serde that should be used for deserialization. Will be chosen automatically if not set." + schema: + type: string + - name: cursor + in: query + description: "id of the cursor for pagination" + schema: + type: string + responses: + 200: + description: OK + content: + text/event-stream: + schema: + type: array + items: + $ref: '#/components/schemas/TopicMessageEvent' + /api/clusters/{clusterName}/topics/{topicName}/activeproducers: get: tags: @@ -2783,13 +2896,14 @@ components: - MESSAGE - CONSUMING - DONE - - EMIT_THROTTLING message: $ref: "#/components/schemas/TopicMessage" phase: $ref: "#/components/schemas/TopicMessagePhase" consuming: $ref: "#/components/schemas/TopicMessageConsuming" + cursor: + $ref: "#/components/schemas/TopicMessageNextPageCursor" TopicMessagePhase: type: object @@ -2819,6 +2933,11 @@ components: filterApplyErrors: type: integer + TopicMessageNextPageCursor: + type: object + properties: + id: + type: string TopicMessage: type: object @@ -2891,6 +3010,29 @@ components: - TIMESTAMP - LATEST + MessageFilterRegistration: + type: object + properties: + filterCode: + type: string + + MessageFilterId: + type: object + properties: + id: + type: string + + PollingMode: + type: string + enum: + - FROM_OFFSET + - TO_OFFSET + - FROM_TIMESTAMP + - TO_TIMESTAMP + - LATEST + - EARLIEST + - TAILING + MessageFilterType: type: string enum: @@ -3968,7 +4110,7 @@ components: keystoreLocation: type: string keystorePassword: - type: string + type: string ksqldbServerAuth: type: object properties: diff --git a/kafka-ui-react-app/package.json b/kafka-ui-react-app/package.json index 172ec4466ab..491e3abacb5 100644 --- a/kafka-ui-react-app/package.json +++ b/kafka-ui-react-app/package.json @@ -21,7 +21,7 @@ "fetch-mock": "^9.11.0", "jest": "^29.4.3", "jest-watch-typeahead": "^2.2.2", - "json-schema-faker": "^0.5.0-rcv.44", + "json-schema-faker": "^0.5.6", "jsonpath-plus": "^7.2.0", "lodash": "^4.17.21", "lossless-json": "^2.0.8", @@ -109,4 +109,4 @@ "node": "v18.17.1", "pnpm": "^8.6.12" } -} +} \ No newline at end of file diff --git a/kafka-ui-react-app/pnpm-lock.yaml b/kafka-ui-react-app/pnpm-lock.yaml index 01862dd3bbe..eb396184bf9 100644 --- a/kafka-ui-react-app/pnpm-lock.yaml +++ b/kafka-ui-react-app/pnpm-lock.yaml @@ -57,8 +57,8 @@ dependencies: specifier: ^2.2.2 version: 2.2.2(jest@29.6.4) json-schema-faker: - specifier: ^0.5.0-rcv.44 - version: 0.5.3 + specifier: ^0.5.6 + version: 0.5.6 jsonpath-plus: specifier: ^7.2.0 version: 7.2.0 @@ -91,7 +91,7 @@ dependencies: version: 7.43.1(react@18.2.0) react-hot-toast: specifier: ^2.4.0 - version: 2.4.1(csstype@3.1.2)(react-dom@18.1.0)(react@18.2.0) + version: 2.4.1(csstype@3.1.3)(react-dom@18.1.0)(react@18.2.0) react-is: specifier: ^18.2.0 version: 18.2.0 @@ -2606,7 +2606,7 @@ packages: normalize-path: 3.0.0 readdirp: 3.6.0 optionalDependencies: - fsevents: 2.3.2 + fsevents: 2.3.3 /ci-info@3.3.1: resolution: {integrity: sha512-SXgeMX9VwDe7iFFaEWkA5AstuER9YKqy4EhHqr4DVqkwmD9rpVimkMKWHdjn30Ja45txyjhSn63lVX69eVCckg==} @@ -2808,6 +2808,10 @@ packages: /csstype@3.1.2: resolution: {integrity: sha512-I7K1Uu0MBPzaFKg4nI5Q7Vs2t+3gWWW648spaF+Rg7pI9ds18Ugn+lvg4SHczUdKlHI5LWBXyqfS8+DufyBsgQ==} + /csstype@3.1.3: + resolution: {integrity: sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==} + dev: false + /damerau-levenshtein@1.0.8: resolution: {integrity: sha512-sdQSFB7+llfUcQHUQO3+B8ERRj0Oa4w9POWMI/puGtuf7gFywGmkaLCElnudfTiKZV+NvHqL0ifzdrI8Ro7ESA==} dev: true @@ -3741,8 +3745,8 @@ packages: /fs.realpath@1.0.0: resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==} - /fsevents@2.3.2: - resolution: {integrity: sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==} + /fsevents@2.3.3: + resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==} engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0} os: [darwin] requiresBuild: true @@ -3903,12 +3907,12 @@ packages: /globrex@0.1.2: resolution: {integrity: sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg==} - /goober@2.1.10(csstype@3.1.2): + /goober@2.1.10(csstype@3.1.3): resolution: {integrity: sha512-7PpuQMH10jaTWm33sQgBQvz45pHR8N4l3Cu3WMGEWmHShAcTuuP7I+5/DwKo39fwti5A80WAjvqgz6SSlgWmGA==} peerDependencies: csstype: ^3.0.10 dependencies: - csstype: 3.1.2 + csstype: 3.1.3 dev: false /gopd@1.0.1: @@ -4544,7 +4548,7 @@ packages: micromatch: 4.0.5 walker: 1.0.8 optionalDependencies: - fsevents: 2.3.2 + fsevents: 2.3.3 dev: false /jest-leak-detector@29.6.3: @@ -4903,8 +4907,8 @@ packages: /json-parse-even-better-errors@2.3.1: resolution: {integrity: sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w==} - /json-schema-faker@0.5.3: - resolution: {integrity: sha512-BeIrR0+YSrTbAR9dOMnjbFl1MvHyXnq+Wpdw1FpWZDHWKLzK229hZ5huyPcmzFUfVq1ODwf40WdGVoE266UBUg==} + /json-schema-faker@0.5.6: + resolution: {integrity: sha512-u/cFC26/GDxh2vPiAC8B8xVvpXAW+QYtG2mijEbKrimCk8IHtiwQBjCE8TwvowdhALWq9IcdIWZ+/8ocXvdL3Q==} hasBin: true dependencies: json-schema-ref-parser: 6.1.0 @@ -5711,14 +5715,14 @@ packages: react: 18.2.0 dev: false - /react-hot-toast@2.4.1(csstype@3.1.2)(react-dom@18.1.0)(react@18.2.0): + /react-hot-toast@2.4.1(csstype@3.1.3)(react-dom@18.1.0)(react@18.2.0): resolution: {integrity: sha512-j8z+cQbWIM5LY37pR6uZR6D4LfseplqnuAO4co4u8917hBUvXlEqyP1ZzqVLcqoyUesZZv/ImreoCeHVDpE5pQ==} engines: {node: '>=10'} peerDependencies: react: '>=16' react-dom: '>=16' dependencies: - goober: 2.1.10(csstype@3.1.2) + goober: 2.1.10(csstype@3.1.3) react: 18.2.0 react-dom: 18.1.0(react@18.2.0) transitivePeerDependencies: @@ -6022,7 +6026,7 @@ packages: engines: {node: '>=14.18.0', npm: '>=8.0.0'} hasBin: true optionalDependencies: - fsevents: 2.3.2 + fsevents: 2.3.3 /run-async@2.4.1: resolution: {integrity: sha512-tvVnVv01b8c1RrA6Ep7JkStj85Guv/YrMcwqYQnwjsAS2cTmmPGBBjAjpCW7RrSodNSoE2/qg9O4bceNvUuDgQ==} @@ -6755,7 +6759,7 @@ packages: rollup: 3.7.3 sass: 1.66.1 optionalDependencies: - fsevents: 2.3.2 + fsevents: 2.3.3 /w3c-hr-time@1.0.2: resolution: {integrity: sha512-z8P5DvDNjKDoFIHK7q8r8lackT6l+jo/Ye3HOle7l9nICP9lf1Ci25fy9vHd0JOWewkIFzXIEig3TdKT7JQ5fQ==} diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.styled.ts b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.styled.ts index 7ec8fbde072..f4a9110ae34 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.styled.ts +++ b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.styled.ts @@ -1,9 +1,11 @@ import Input from 'components/common/Input/Input'; import Select from 'components/common/Select/Select'; import styled, { css } from 'styled-components'; -import DatePicker from 'react-datepicker'; +// import DatePicker from 'react-datepicker'; import EditIcon from 'components/common/Icons/EditIcon'; import closeIcon from 'components/common/Icons/CloseIcon'; +import DatePicker from 'components/common/DatePicker/DatePicker'; +import ReactDatePicker from 'react-datepicker'; interface SavedFilterProps { selected: boolean; @@ -21,6 +23,7 @@ export const FiltersWrapper = styled.div` flex-direction: column; padding-left: 16px; padding-right: 16px; + padding-top: 8px; & > div:first-child { display: flex; @@ -38,10 +41,10 @@ export const FilterInputs = styled.div` flex-wrap: wrap; `; -export const SeekTypeSelectorWrapper = styled.div` +export const PollingModeSelectorWrapper = styled.div` display: flex; & .select-wrapper { - width: 40% !important; + width: 60% !important; & > select { border-radius: 4px 0 0 4px !important; } @@ -55,6 +58,34 @@ export const OffsetSelector = styled(Input)` } `; +export const TestDatePickerInput = styled(ReactDatePicker)` + height: 32px; + border: 1px ${({ theme }) => theme.select.borderColor.normal} solid; + border-left: none; + border-radius: 0 4px 4px 0; + font-size: 14px; + width: 100%; + padding-left: 12px; + background-color: ${({ theme }) => theme.input.backgroundColor.normal}; + color: ${({ theme }) => theme.input.color.normal}; + &::placeholder { + color: ${({ theme }) => theme.input.color.normal}; + } + + background-image: url('data:image/svg+xml,%3Csvg width="10" height="6" viewBox="0 0 10 6" fill="none" xmlns="http://www.w3.org/2000/svg"%3E%3Cpath d="M1 1L5 5L9 1" stroke="%23454F54"/%3E%3C/svg%3E%0A') !important; + background-repeat: no-repeat !important; + background-position-x: 96% !important; + background-position-y: 55% !important; + appearance: none !important; + + &:hover { + cursor: pointer; + } + &:focus { + outline: none; + } +`; + export const DatePickerInput = styled(DatePicker)` height: 32px; border: 1px ${({ theme }) => theme.select.borderColor.normal} solid; @@ -90,6 +121,7 @@ export const FiltersMetrics = styled.div` gap: 22px; padding-top: 16px; padding-bottom: 16px; + width: 600px; `; export const Message = styled.div` font-size: 14px; @@ -240,6 +272,7 @@ export const ActiveSmartFilterWrapper = styled.div` gap: 10px; align-items: center; justify-content: flex-start; + width: 90%; `; export const DeleteSavedFilter = styled.div.attrs({ role: 'deleteIcon' })` @@ -357,15 +390,15 @@ export const MessageLoading = styled.div.attrs({ role: 'contentLoader', })` color: ${({ theme }) => theme.heading.h3.color}; - font-size: ${({ theme }) => theme.heading.h3.fontSize}; + font-size: ${({ theme }) => theme.heading.variants[5].fontSize}; display: ${({ isLive }) => (isLive ? 'flex' : 'none')}; justify-content: space-around; - width: 250px; + width: 150px; `; export const StopLoading = styled.div` color: ${({ theme }) => theme.pageLoader.borderColor}; - font-size: ${({ theme }) => theme.heading.h3.fontSize}; + font-size: ${({ theme }) => theme.heading.variants[5].fontSize}; cursor: pointer; `; @@ -413,7 +446,7 @@ export const BackToCustomText = styled.div` cursor: pointer; `; -export const SeekTypeSelect = styled(Select)` +export const PollingModeSelect = styled(Select)` border-top-right-radius: 0; border-bottom-right-radius: 0; user-select: none; @@ -424,3 +457,15 @@ export const Serdes = styled.div` gap: 24px; padding: 8px 0; `; + +export const RefreshIconContainer = styled.button` + cursor: pointer; + padding: 0; + background: none; + border: none; + align-self: center; + height: 24px; + svg { + padding: 2px; + } +`; diff --git a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx index 347623d2226..fc7c1de9515 100644 --- a/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx +++ b/kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx @@ -2,14 +2,12 @@ import 'react-datepicker/dist/react-datepicker.css'; import { MessageFilterType, - Partition, - SeekDirection, - SeekType, - SerdeUsage, + PollingMode, TopicMessage, TopicMessageConsuming, TopicMessageEvent, TopicMessageEventTypeEnum, + TopicMessageNextPageCursor, } from 'generated-sources'; import React, { useContext } from 'react'; import omitBy from 'lodash/omitBy'; @@ -17,14 +15,17 @@ import { useNavigate, useLocation, useSearchParams } from 'react-router-dom'; import MultiSelect from 'components/common/MultiSelect/MultiSelect.styled'; import { Option } from 'react-multi-select-component'; import BytesFormatted from 'components/common/BytesFormatted/BytesFormatted'; -import { BASE_PARAMS } from 'lib/constants'; -import Select from 'components/common/Select/Select'; +import { + BASE_PARAMS, + PollingModeOptions, + PollingModeOptionsObj, +} from 'lib/constants'; +import { SelectOption } from 'components/common/Select/Select'; import { Button } from 'components/common/Button/Button'; import Search from 'components/common/Search/Search'; import FilterModal, { FilterEdit, } from 'components/Topics/Topic/Messages/Filters/FilterModal'; -import { SeekDirectionOptions } from 'components/Topics/Topic/Messages/Messages'; import TopicMessagesContext from 'components/contexts/TopicMessagesContext'; import useBoolean from 'lib/hooks/useBoolean'; import { RouteParamsClusterTopic } from 'lib/paths'; @@ -35,10 +36,14 @@ import CloseIcon from 'components/common/Icons/CloseIcon'; import ClockIcon from 'components/common/Icons/ClockIcon'; import ArrowDownIcon from 'components/common/Icons/ArrowDownIcon'; import FileIcon from 'components/common/Icons/FileIcon'; -import { useTopicDetails } from 'lib/hooks/api/topics'; -import { InputLabel } from 'components/common/Input/InputLabel.styled'; -import { getSerdeOptions } from 'components/Topics/Topic/SendMessage/utils'; -import { useSerdes } from 'lib/hooks/api/topicMessages'; +import { useRegisterFilter, useTopicDetails } from 'lib/hooks/api/topics'; +import { getTopicMessgesLastLoadedPage } from 'redux/reducers/topicMessages/selectors'; +import { useAppSelector } from 'lib/hooks/redux'; +import { showAlert } from 'lib/errorHandling'; +import RefreshIcon from 'components/common/Icons/RefreshIcon'; +import Input from 'components/common/Input/Input'; +import DatePicker from 'components/common/DatePicker/DatePicker'; +import { SelectSubFormProps } from 'components/common/Select/SelectSubForm'; import * as S from './Filters.styled'; import { @@ -55,12 +60,18 @@ export interface FiltersProps { meta: TopicMessageConsuming; isFetching: boolean; messageEventType?: string; + cursor?: TopicMessageNextPageCursor; + currentPage: number; addMessage(content: { message: TopicMessage; prepend: boolean }): void; resetMessages(): void; updatePhase(phase: string): void; updateMeta(meta: TopicMessageConsuming): void; setIsFetching(status: boolean): void; setMessageType(messageType: string): void; + updateCursor(cursor?: TopicMessageNextPageCursor): void; + setCurrentPage(page: number): void; + setLastLoadedPage(page: number): void; + resetAllMessages(): void; } export interface MessageFilters { @@ -76,15 +87,11 @@ export interface ActiveMessageFilter { const PER_PAGE = 100; -export const SeekTypeOptions = [ - { value: SeekType.OFFSET, label: 'Offset' }, - { value: SeekType.TIMESTAMP, label: 'Timestamp' }, -]; - const Filters: React.FC = ({ phaseMessage, meta: { elapsedMs, bytesConsumed, messagesConsumed, filterApplyErrors }, isFetching, + currentPage, addMessage, resetMessages, updatePhase, @@ -92,20 +99,33 @@ const Filters: React.FC = ({ setIsFetching, setMessageType, messageEventType, + updateCursor, + setCurrentPage, + setLastLoadedPage, + resetAllMessages, }) => { const { clusterName, topicName } = useAppParams(); const location = useLocation(); const navigate = useNavigate(); const [searchParams] = useSearchParams(); - const page = searchParams.get('page'); - const { data: topic } = useTopicDetails({ clusterName, topicName }); + const registerFilter = useRegisterFilter({ clusterName, topicName }); + + const lastLoadedPage = useAppSelector(getTopicMessgesLastLoadedPage); + const partitions = topic?.partitions || []; - const { seekDirection, isLive, changeSeekDirection } = - useContext(TopicMessagesContext); + const { + pollingMode, + isLive, + changePollingMode, + page, + setPage, + keySerde, + valueSerde, + } = useContext(TopicMessagesContext); const { value: isOpen, toggle } = useBoolean(); @@ -116,14 +136,6 @@ const Filters: React.FC = ({ const [selectedPartitions, setSelectedPartitions] = React.useState( getSelectedPartitionsFromSeekToParam(searchParams, partitions) ); - - const [currentSeekType, setCurrentSeekType] = React.useState( - SeekTypeOptions.find( - (ele) => ele.value === (searchParams.get('seekType') as SeekType) - ) !== undefined - ? (searchParams.get('seekType') as SeekType) - : SeekType.OFFSET - ); const [offset, setOffset] = React.useState( getOffsetFromSeekToParam(searchParams) ); @@ -131,12 +143,6 @@ const Filters: React.FC = ({ const [timestamp, setTimestamp] = React.useState( getTimestampFromSeekToParam(searchParams) ); - const [keySerde, setKeySerde] = React.useState( - searchParams.get('keySerde') || '' - ); - const [valueSerde, setValueSerde] = React.useState( - searchParams.get('valueSerde') || '' - ); const [savedFilters, setSavedFilters] = React.useState( JSON.parse(localStorage.getItem('savedFilters') ?? '[]') @@ -155,112 +161,64 @@ const Filters: React.FC = ({ ? MessageFilterType.GROOVY_SCRIPT : MessageFilterType.STRING_CONTAINS ); - const [query, setQuery] = React.useState(searchParams.get('q') || ''); + const [stringFilter, setStringFilter] = React.useState(''); const [isTailing, setIsTailing] = React.useState(isLive); - const isSeekTypeControlVisible = React.useMemo( + const isPollingModeControlVisible = React.useMemo( () => selectedPartitions.length > 0, [selectedPartitions] ); const isSubmitDisabled = React.useMemo(() => { - if (isSeekTypeControlVisible) { + if (isPollingModeControlVisible) { return ( - (currentSeekType === SeekType.TIMESTAMP && !timestamp) || isTailing + ((pollingMode === PollingMode.FROM_TIMESTAMP || + pollingMode === PollingMode.TO_TIMESTAMP) && + !timestamp) || + isTailing ); } return false; - }, [isSeekTypeControlVisible, currentSeekType, timestamp, isTailing]); - - const partitionMap = React.useMemo( - () => - partitions.reduce>( - (acc, partition) => ({ - ...acc, - [partition.partition]: partition, - }), - {} - ), - [partitions] - ); - - const handleClearAllFilters = () => { - setCurrentSeekType(SeekType.OFFSET); - setOffset(''); - setTimestamp(null); - setQuery(''); - changeSeekDirection(SeekDirection.FORWARD); - getSelectedPartitionsFromSeekToParam(searchParams, partitions); - setSelectedPartitions( - partitions.map((partition: Partition) => { - return { - value: partition.partition, - label: `Partition #${partition.partition.toString()}`, - }; - }) - ); + }, [isPollingModeControlVisible, pollingMode, timestamp, isTailing]); + + const getSmartFilterId = async (code: string) => { + try { + const filterId = await registerFilter.mutateAsync({ + filterCode: code, + }); + return filterId; + } catch (e) { + showAlert('error', { + message: 'Error occured while registering smart filter', + }); + return ''; + } }; - const handleFiltersSubmit = (currentOffset: string) => { - const nextAttempt = Number(searchParams.get('attempt') || 0) + 1; + const handleFiltersSubmit = async (cursor?: TopicMessageNextPageCursor) => { + if (!keySerde || !valueSerde) return; const props: Query = { - q: - queryType === MessageFilterType.GROOVY_SCRIPT - ? activeFilter.code - : query, - filterQueryType: queryType, - attempt: nextAttempt, + mode: pollingMode, limit: PER_PAGE, - page: page || 0, - seekDirection, + stringFilter, + offset, + timestamp: timestamp?.getTime() || 0, keySerde: keySerde || searchParams.get('keySerde') || '', valueSerde: valueSerde || searchParams.get('valueSerde') || '', }; - if (isSeekTypeControlVisible) { - switch (seekDirection) { - case SeekDirection.FORWARD: - props.seekType = SeekType.BEGINNING; - break; - case SeekDirection.BACKWARD: - case SeekDirection.TAILING: - props.seekType = SeekType.LATEST; - break; - default: - props.seekType = currentSeekType; - } + if (props.mode === PollingMode.TAILING) setIsTailing(true); - if (offset && currentSeekType === SeekType.OFFSET) { - props.seekType = SeekType.OFFSET; - } + if (cursor?.id) props.cursor = cursor?.id; - if (timestamp && currentSeekType === SeekType.TIMESTAMP) { - props.seekType = SeekType.TIMESTAMP; - } - - const isSeekTypeWithSeekTo = - props.seekType === SeekType.TIMESTAMP || - props.seekType === SeekType.OFFSET; + if (selectedPartitions.length !== partitions.length) { + props.partitions = selectedPartitions.map((p) => p.value); + } - if ( - selectedPartitions.length !== partitions.length || - isSeekTypeWithSeekTo - ) { - // not everything in the partition is selected - props.seekTo = selectedPartitions.map(({ value }) => { - const offsetProperty = - seekDirection === SeekDirection.FORWARD ? 'offsetMin' : 'offsetMax'; - const offsetBasedSeekTo = - currentOffset || partitionMap[value][offsetProperty]; - const seekToOffset = - currentSeekType === SeekType.OFFSET - ? offsetBasedSeekTo - : timestamp?.getTime(); - - return `${value}::${seekToOffset || '0'}`; - }); - } + if (queryType === MessageFilterType.GROOVY_SCRIPT) { + props.smartFilterId = + (await getSmartFilterId(activeFilter.code))?.id || ''; } const newProps = omitBy(props, (v) => v === undefined || v === ''); @@ -272,6 +230,12 @@ const Filters: React.FC = ({ }); }; + const handleSubmit = async () => { + setPage(1); + resetAllMessages(); + handleFiltersSubmit(); + }; + const handleSSECancel = () => { if (!source.current) return; setIsFetching(false); @@ -342,12 +306,43 @@ const Filters: React.FC = ({ editSavedFilter(filter); } }; + + const handlePollingModeSelect = ( + pollingModeVal: PollingMode, + value?: string | Date | null + ) => { + changePollingMode(pollingModeVal); + if ( + (pollingModeVal === PollingMode.FROM_OFFSET || + pollingModeVal === PollingMode.TO_OFFSET) && + value + ) { + setOffset(value as string); + } + if ( + (pollingModeVal === PollingMode.FROM_TIMESTAMP || + pollingModeVal === PollingMode.TO_TIMESTAMP) && + value + ) { + setTimestamp(value as Date | null); + } + // setPage(1); + // resetAllMessages(); + // handleFiltersSubmit(); + }; + // eslint-disable-next-line consistent-return React.useEffect(() => { if (location.search?.length !== 0) { + if (page === currentPage) return () => {}; + if (page <= lastLoadedPage) { + setCurrentPage(page); + return () => {}; + } + const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent( clusterName - )}/topics/${topicName}/messages${location.search}`; + )}/topics/${topicName}/messages/v2${location.search}`; const sse = new EventSource(url); source.current = sse; @@ -358,7 +353,7 @@ const Filters: React.FC = ({ setIsFetching(true); }; sse.onmessage = ({ data }) => { - const { type, message, phase, consuming }: TopicMessageEvent = + const { type, message, phase, consuming, cursor }: TopicMessageEvent = JSON.parse(data); switch (type) { case TopicMessageEventTypeEnum.MESSAGE: @@ -381,6 +376,10 @@ const Filters: React.FC = ({ if (consuming && type) { setMessageType(type); updateMeta(consuming); + updateCursor(cursor); + setCurrentPage(page); + setLastLoadedPage(page); + handleFiltersSubmit(cursor); } break; default: @@ -400,68 +399,129 @@ const Filters: React.FC = ({ }, [ clusterName, topicName, - seekDirection, + pollingMode, location, addMessage, resetMessages, setIsFetching, updateMeta, updatePhase, + updateCursor, + setLastLoadedPage, ]); + React.useEffect(() => { if (location.search?.length === 0) { - handleFiltersSubmit(offset); + setPage(1); + resetAllMessages(); + handleFiltersSubmit(); } - }, [ - seekDirection, - queryType, - activeFilter, - currentSeekType, - timestamp, - query, - location, - ]); + }, [queryType, activeFilter, pollingMode, timestamp, stringFilter, location]); + React.useEffect(() => { - handleFiltersSubmit(offset); - }, [ - seekDirection, - queryType, - activeFilter, - currentSeekType, - timestamp, - query, - seekDirection, - page, - ]); + setPage(1); + resetAllMessages(); + handleFiltersSubmit(); + }, [pollingMode, queryType, keySerde, valueSerde]); + + React.useEffect(() => { + setPage(1); + resetAllMessages(); + }, [selectedPartitions, stringFilter, activeFilter]); React.useEffect(() => { setIsTailing(isLive); }, [isLive]); - const { data: serdes = {} } = useSerdes({ - clusterName, - topicName, - use: SerdeUsage.DESERIALIZE, - }); + const formatElapsedTime = (elapsedTimeMs: number): string => { + let timeMs = elapsedTimeMs; + // Convert milliseconds to hours, minutes, and seconds + const hours = Math.floor(timeMs / 3600000); + + // Format the time components into a string + if (hours > 0) { + return `${hours}h `; + } + timeMs %= 3600000; + const minutes = Math.floor(timeMs / 60000); + if (minutes > 0 || hours > 0) { + return `${minutes}m `; + } + timeMs %= 60000; + const seconds = Math.floor(timeMs / 1000); + if (seconds > 0 || minutes > 0 || hours > 0) { + return `${seconds}s `; + } + timeMs %= 1000; + return `${timeMs}ms`; + }; + + const pollingModeOptions: SelectOption[] = PollingModeOptions.map( + (option) => { + let subFormProps: SelectSubFormProps | undefined; + if ( + option === PollingModeOptionsObj.TO_OFFSET || + option === PollingModeOptionsObj.FROM_OFFSET + ) + subFormProps = { + inputType: Input, + inputProps: { + id: 'offset', + type: 'text', + label: 'Offset', + inputSize: 'M', + placeholder: '', + integerOnly: true, + positiveOnly: true, + }, + }; + if ( + option === PollingModeOptionsObj.TO_TIMESTAMP || + option === PollingModeOptionsObj.FROM_TIMESTAMP + ) + subFormProps = { + inputType: DatePicker, + inputProps: { + showTimeInput: true, + timeInputLabel: 'Time:', + dateFormat: 'MMM d, yyyy HH:mm', + placeholderText: 'Select timestamp', + inline: true, + maxDate: new Date(Date.now()), + }, + }; + + return { + ...option, + subFormProps, + }; + } + ); + + // const [pollingModeOptions, setPollingModeOptions] = React.useState(getPollingModeOptions()); return (
- Seek Type - - setCurrentSeekType(option as SeekType)} - value={currentSeekType} + + + handlePollingModeSelect(option as PollingMode, value) + } + value={pollingMode} selectSize="M" - minWidth="100px" - options={SeekTypeOptions} + minWidth="128px" + optionsMaxHeight="400px" + options={pollingModeOptions} disabled={isTailing} + isLive={isLive} /> - {currentSeekType === SeekType.OFFSET ? ( + {(pollingMode === PollingMode.FROM_OFFSET || + pollingMode === PollingMode.TO_OFFSET) && ( = ({ onChange={({ target: { value } }) => setOffset(value)} disabled={isTailing} /> - ) : ( + )} + {(pollingMode === PollingMode.FROM_TIMESTAMP || + pollingMode === PollingMode.TO_TIMESTAMP) && ( setTimestamp(date)} @@ -482,10 +544,9 @@ const Filters: React.FC = ({ disabled={isTailing} /> )} - +
- Partitions ({ label: `Partition #${p.partition.toString()}`, @@ -498,77 +559,84 @@ const Filters: React.FC = ({ disabled={isTailing} />
-
- Key Serde - setValueSerde(option as string)} - options={getSerdeOptions(serdes.value || [])} - value={searchParams.get('valueSerde') as string} - minWidth="170px" - selectSize="M" - disabled={isTailing} - /> -
- Clear all - + +
- onChange && onChange(val)} + /> + ); + if (typeof DatePicker === typeof inputType) + return ( + onChange && onChange(date as Date)} + /> + ); + return null; +}; + +export default SelectSubForm; diff --git a/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.styled.ts b/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.styled.ts index 07b0d158ac5..b671fde2962 100644 --- a/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.styled.ts +++ b/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.styled.ts @@ -70,10 +70,10 @@ const DESCMixin = css( export const Title = styled.span( ({ isOrderable, isOrdered, sortOrder, theme: { table } }) => css` font-family: Inter, sans-serif; - font-size: 12px; + font-size: 14px; font-style: normal; font-weight: 400; - line-height: 16px; + line-height: 20px; letter-spacing: 0; text-align: left; display: inline-block; @@ -96,7 +96,7 @@ export const Preview = styled.span` font-family: Inter, sans-serif; font-style: normal; font-weight: 400; - line-height: 16px; + line-height: 20px; letter-spacing: 0; text-align: left; background: ${({ theme }) => theme.table.th.backgroundColor.normal}; diff --git a/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.tsx b/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.tsx index 341ab6c8814..23e27660447 100644 --- a/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.tsx +++ b/kafka-ui-react-app/src/components/common/table/TableHeaderCell/TableHeaderCell.tsx @@ -10,6 +10,7 @@ export interface TableHeaderCellProps { sortOrder?: SortOrder; orderValue?: string; handleOrderBy?: (orderBy: string | null) => void; + style?: React.CSSProperties; } const TableHeaderCell: React.FC> = ( @@ -23,6 +24,7 @@ const TableHeaderCell: React.FC> = ( sortOrder, orderValue, handleOrderBy, + children, ...restProps } = props; @@ -64,6 +66,7 @@ const TableHeaderCell: React.FC> = ( {previewText} )} + {children} ); }; diff --git a/kafka-ui-react-app/src/components/contexts/TopicMessagesContext.ts b/kafka-ui-react-app/src/components/contexts/TopicMessagesContext.ts index 3ca2ca65521..568f2f84da6 100644 --- a/kafka-ui-react-app/src/components/contexts/TopicMessagesContext.ts +++ b/kafka-ui-react-app/src/components/contexts/TopicMessagesContext.ts @@ -1,10 +1,17 @@ import React from 'react'; -import { SeekDirection } from 'generated-sources'; +import { PollingMode, TopicSerdeSuggestion } from 'generated-sources'; export interface ContextProps { - seekDirection: SeekDirection; - changeSeekDirection(val: string): void; + pollingMode: PollingMode; + changePollingMode(val: string): void; + page: number; + setPage(page: number): void; isLive: boolean; + keySerde: string; + setKeySerde(val: string): void; + valueSerde: string; + setValueSerde(val: string): void; + serdes: TopicSerdeSuggestion; } const TopicMessagesContext = React.createContext( diff --git a/kafka-ui-react-app/src/lib/constants.ts b/kafka-ui-react-app/src/lib/constants.ts index a3e622550b8..78a8d863724 100644 --- a/kafka-ui-react-app/src/lib/constants.ts +++ b/kafka-ui-react-app/src/lib/constants.ts @@ -1,5 +1,9 @@ import { SelectOption } from 'components/common/Select/Select'; -import { ConfigurationParameters, ConsumerGroupState } from 'generated-sources'; +import { + ConfigurationParameters, + ConsumerGroupState, + PollingMode, +} from 'generated-sources'; declare global { interface Window { @@ -107,3 +111,43 @@ export const CONSUMER_GROUP_STATE_TOOLTIPS: Record = DEAD: 'The group is going to be removed. It might be due to the inactivity, or the group is being migrated to different group coordinator.', UNKNOWN: '', } as const; + +export const PollingModeOptionsObj = { + [PollingMode.LATEST]: { + value: PollingMode.LATEST, + label: 'Newest', + isLive: false, + }, + [PollingMode.EARLIEST]: { + value: PollingMode.EARLIEST, + label: 'Oldest', + isLive: false, + }, + [PollingMode.TAILING]: { + value: PollingMode.TAILING, + label: 'Live Mode', + isLive: true, + }, + [PollingMode.FROM_OFFSET]: { + value: PollingMode.FROM_OFFSET, + label: 'From Offset', + isLive: false, + }, + [PollingMode.TO_OFFSET]: { + value: PollingMode.TO_OFFSET, + label: 'To Offset', + isLive: false, + }, + [PollingMode.FROM_TIMESTAMP]: { + value: PollingMode.FROM_TIMESTAMP, + label: 'From Time', + isLive: false, + }, + [PollingMode.TO_TIMESTAMP]: { + value: PollingMode.TO_TIMESTAMP, + label: 'To Time', + isLive: false, + }, +}; + +export const PollingModeOptions = Object.values(PollingModeOptionsObj); diff --git a/kafka-ui-react-app/src/lib/fixtures/filter.ts b/kafka-ui-react-app/src/lib/fixtures/filter.ts new file mode 100644 index 00000000000..1facaa0b914 --- /dev/null +++ b/kafka-ui-react-app/src/lib/fixtures/filter.ts @@ -0,0 +1,3 @@ +import { MessageFilterId } from 'generated-sources'; + +export const filterRegistrationPayload: MessageFilterId = { id: 'xrysu' }; diff --git a/kafka-ui-react-app/src/lib/hooks/api/topics.ts b/kafka-ui-react-app/src/lib/hooks/api/topics.ts index 00d08bc66b8..b589beff3b8 100644 --- a/kafka-ui-react-app/src/lib/hooks/api/topics.ts +++ b/kafka-ui-react-app/src/lib/hooks/api/topics.ts @@ -15,6 +15,7 @@ import { CreateTopicMessage, GetTopicDetailsRequest, GetTopicsRequest, + MessageFilterRegistration, Topic, TopicConfig, TopicCreation, @@ -39,6 +40,8 @@ export const topicKeys = { [...topicKeys.details(props), 'consumerGroups'] as const, statistics: (props: GetTopicDetailsRequest) => [...topicKeys.details(props), 'statistics'] as const, + filter: (props: GetTopicDetailsRequest) => + [...topicKeys.details(props), 'messageFilterRegistration'] as const, }; export function useTopics(props: GetTopicsRequest) { @@ -329,3 +332,25 @@ export function useCancelTopicAnalysis(props: GetTopicDetailsRequest) { }, }); } + +export function useRegisterFilter(props: GetTopicDetailsRequest) { + const client = useQueryClient(); + return useMutation( + (filter: MessageFilterRegistration) => + messagesApi.registerFilter({ + ...props, + messageFilterRegistration: filter, + }), + { + onSuccess: () => { + showSuccessAlert({ + message: `Filter successfully registered.`, + }); + client.invalidateQueries(topicKeys.filter(props)); + }, + onError: (e) => { + showServerError(e as Response); + }, + } + ); +} diff --git a/kafka-ui-react-app/src/redux/interfaces/topic.ts b/kafka-ui-react-app/src/redux/interfaces/topic.ts index bdc25ee0c60..d9d9719c18a 100644 --- a/kafka-ui-react-app/src/redux/interfaces/topic.ts +++ b/kafka-ui-react-app/src/redux/interfaces/topic.ts @@ -4,6 +4,7 @@ import { TopicCreation, TopicMessage, TopicMessageConsuming, + TopicMessageNextPageCursor, } from 'generated-sources'; export type TopicName = Topic['name']; @@ -52,9 +53,13 @@ export interface TopicFormData { } export interface TopicMessagesState { + allMessages: TopicMessage[]; messages: TopicMessage[]; phase?: string; meta: TopicMessageConsuming; messageEventType?: string; isFetching: boolean; + cursor?: TopicMessageNextPageCursor; + currentPage: number; + lastLoadedPage: number; } diff --git a/kafka-ui-react-app/src/redux/reducers/topicMessages/selectors.ts b/kafka-ui-react-app/src/redux/reducers/topicMessages/selectors.ts index b2636cdf2ad..bec0a173e34 100644 --- a/kafka-ui-react-app/src/redux/reducers/topicMessages/selectors.ts +++ b/kafka-ui-react-app/src/redux/reducers/topicMessages/selectors.ts @@ -19,6 +19,21 @@ export const getTopicMessgesMeta = createSelector( ({ meta }) => meta ); +export const getTopicMessgesCursor = createSelector( + topicMessagesState, + ({ cursor }) => cursor +); + +export const getTopicMessgesCurrentPage = createSelector( + topicMessagesState, + ({ currentPage }) => currentPage +); + +export const getTopicMessgesLastLoadedPage = createSelector( + topicMessagesState, + ({ lastLoadedPage }) => lastLoadedPage +); + export const getIsTopicMessagesFetching = createSelector( topicMessagesState, ({ isFetching }) => isFetching diff --git a/kafka-ui-react-app/src/redux/reducers/topicMessages/topicMessagesSlice.ts b/kafka-ui-react-app/src/redux/reducers/topicMessages/topicMessagesSlice.ts index 530a3781140..b82c27fbcca 100644 --- a/kafka-ui-react-app/src/redux/reducers/topicMessages/topicMessagesSlice.ts +++ b/kafka-ui-react-app/src/redux/reducers/topicMessages/topicMessagesSlice.ts @@ -2,7 +2,10 @@ import { createSlice } from '@reduxjs/toolkit'; import { TopicMessagesState } from 'redux/interfaces'; import { TopicMessage } from 'generated-sources'; +const PER_PAGE = 100; + export const initialState: TopicMessagesState = { + allMessages: [], messages: [], meta: { bytesConsumed: 0, @@ -12,6 +15,8 @@ export const initialState: TopicMessagesState = { }, messageEventType: '', isFetching: false, + currentPage: 0, + lastLoadedPage: 0, }; const topicMessagesSlice = createSlice({ @@ -19,16 +24,28 @@ const topicMessagesSlice = createSlice({ initialState, reducers: { addTopicMessage: (state, action) => { + const allmessages: TopicMessage[] = action.payload.prepend + ? [action.payload.message, ...state.allMessages] + : [...state.allMessages, action.payload.message]; + const messages: TopicMessage[] = action.payload.prepend ? [action.payload.message, ...state.messages] : [...state.messages, action.payload.message]; return { ...state, + allMessages: allmessages, messages, }; }, - resetTopicMessages: () => initialState, + resetTopicMessages: (state) => { + return { + ...initialState, + currentPage: state.currentPage, + allMessages: state.allMessages, + }; + }, + resetAllTopicMessages: () => initialState, updateTopicMessagesPhase: (state, action) => { state.phase = action.payload; }, @@ -42,6 +59,28 @@ const topicMessagesSlice = createSlice({ setMessageEventType: (state, action) => { state.messageEventType = action.payload; }, + updateTopicMessagesCursor: (state, action) => { + state.cursor = action.payload; + }, + setTopicMessagesCurrentPage: (state, action) => { + if (state.currentPage !== action.payload) { + const messages: TopicMessage[] = state.allMessages.slice( + (action.payload - 1) * PER_PAGE, + (action.payload - 1) * PER_PAGE + PER_PAGE + ); + return { + ...state, + currentPage: action.payload, + messages, + }; + } + return { + ...state, + }; + }, + setTopicMessagesLastLoadedPage: (state, action) => { + state.lastLoadedPage = action.payload; + }, }, }); @@ -52,6 +91,10 @@ export const { updateTopicMessagesMeta, setTopicMessagesFetchingStatus, setMessageEventType, + updateTopicMessagesCursor, + setTopicMessagesCurrentPage, + setTopicMessagesLastLoadedPage, + resetAllTopicMessages, } = topicMessagesSlice.actions; export default topicMessagesSlice.reducer; diff --git a/kafka-ui-react-app/src/theme/theme.ts b/kafka-ui-react-app/src/theme/theme.ts index 8426d3c2fcd..6597732282d 100644 --- a/kafka-ui-react-app/src/theme/theme.ts +++ b/kafka-ui-react-app/src/theme/theme.ts @@ -212,6 +212,7 @@ const baseTheme = { hover: Colors.blue[45], active: Colors.brand[15], }, + refreshIcon: Colors.brand[50], }, textArea: { borderColor: { @@ -750,6 +751,26 @@ export const theme = { }, }, }, + datePicker: { + color: { + normal: { + background: Colors.neutral[0], + text: Colors.neutral[90], + }, + active: { + background: Colors.brand[50], + text: Colors.neutral[0], + }, + gray: Colors.neutral[30], + }, + borderColor: { + normal: Colors.neutral[30], + active: Colors.neutral[70], + }, + navigationIcon: { + color: Colors.brand[50], + }, + }, }; export type ThemeType = typeof theme; @@ -1205,6 +1226,7 @@ export const darkTheme: ThemeType = { ...baseTheme.icons.discord, normal: Colors.neutral[30], }, + refreshIcon: Colors.brand[50], }, textArea: { ...baseTheme.textArea, @@ -1262,4 +1284,24 @@ export const darkTheme: ThemeType = { }, }, }, + datePicker: { + color: { + normal: { + background: Colors.neutral[85], + text: Colors.neutral[0], + }, + active: { + background: Colors.brand[30], + text: Colors.neutral[0], + }, + gray: Colors.neutral[30], + }, + borderColor: { + normal: Colors.neutral[30], + active: Colors.neutral[70], + }, + navigationIcon: { + color: Colors.brand[50], + }, + }, }; diff --git a/pom.xml b/pom.xml index aa02a56f0e9..d5efdbfa57a 100644 --- a/pom.xml +++ b/pom.xml @@ -24,23 +24,23 @@ 4.12.0 2.11.1 3.19.0 - 1.11.1 + 1.11.3 1.12.19 - 7.4.0 + 7.5.1 3.1.0 3.0.13 2.14.0 - 3.5.0 + 3.6.1 1.5.5.Final 1.18.24 3.23.3 2.13.9 2.0 - 3.1.3 + 3.2.3 1.0.0 0.1.17 0.1.26 - 20230227 + 20240205 5.9.1