Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/disable GitHub actions #4396

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions .github/workflows/e2e-automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ jobs:
- uses: actions/checkout@v3
with:
ref: ${{ github.sha }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-central-1
# - name: Configure AWS credentials
# uses: aws-actions/configure-aws-credentials@v3
# with:
# aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
# aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
# aws-region: eu-central-1
- name: Set up environment
id: set_env_values
run: |
Expand Down Expand Up @@ -68,21 +68,21 @@ jobs:
allure_report: allure-report
subfolder: allure-results
report_url: "http://kafkaui-allure-reports.s3-website.eu-central-1.amazonaws.com"
- uses: jakejarvis/s3-sync-action@master
if: always()
env:
AWS_S3_BUCKET: 'kafkaui-allure-reports'
AWS_REGION: 'eu-central-1'
SOURCE_DIR: 'allure-history/allure-results'
- name: Deploy report to Amazon S3
if: always()
uses: Sibz/[email protected]
with:
authToken: ${{secrets.GITHUB_TOKEN}}
context: "Click Details button to open Allure report"
state: "success"
sha: ${{ github.sha }}
target_url: http://kafkaui-allure-reports.s3-website.eu-central-1.amazonaws.com/${{ github.run_number }}
# - uses: jakejarvis/s3-sync-action@master
# if: always()
# env:
# AWS_S3_BUCKET: 'kafkaui-allure-reports'
# AWS_REGION: 'eu-central-1'
# SOURCE_DIR: 'allure-history/allure-results'
# - name: Deploy report to Amazon S3
# if: always()
# uses: Sibz/[email protected]
# with:
# authToken: ${{secrets.GITHUB_TOKEN}}
# context: "Click Details button to open Allure report"
# state: "success"
# sha: ${{ github.sha }}
# target_url: http://kafkaui-allure-reports.s3-website.eu-central-1.amazonaws.com/${{ github.run_number }}
- name: Dump Docker logs on failure
if: failure()
uses: jwalton/[email protected]
16 changes: 8 additions & 8 deletions .github/workflows/frontend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ jobs:
run: |
cd kafka-ui-react-app/
pnpm test:CI
- name: SonarCloud Scan
uses: sonarsource/sonarcloud-github-action@master
with:
projectBaseDir: ./kafka-ui-react-app
args: -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} -Dsonar.pullrequest.branch=${{ github.head_ref }} -Dsonar.pullrequest.base=${{ github.base_ref }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN_FRONTEND }}
# - name: SonarCloud Scan
# uses: sonarsource/sonarcloud-github-action@master
# with:
# projectBaseDir: ./kafka-ui-react-app
# args: -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} -Dsonar.pullrequest.branch=${{ github.head_ref }} -Dsonar.pullrequest.base=${{ github.base_ref }}
# env:
# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# SONAR_TOKEN: ${{ secrets.SONAR_TOKEN_FRONTEND }}
4 changes: 2 additions & 2 deletions kafka-ui-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.7</version>
<version>2.0.3</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -485,7 +485,7 @@
<verbose>true</verbose>
<images>
<image>
<name>provectuslabs/kafka-ui:${git.revision}</name>
<name>gimral/kafka-ui:${git.revision}</name>
<build>
<contextDir>${project.basedir}</contextDir>
<args>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static com.provectus.kafka.ui.serde.api.Serde.Target.KEY;
import static com.provectus.kafka.ui.serde.api.Serde.Target.VALUE;
import static java.util.stream.Collectors.toMap;

import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.CreateTopicMessageDTO;
import com.provectus.kafka.ui.model.MessageFilterIdDTO;
import com.provectus.kafka.ui.model.MessageFilterRegistrationDTO;
import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.PollingModeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.SerdeUsageDTO;
Expand All @@ -25,14 +26,11 @@
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.ValidationException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -76,6 +74,7 @@ public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilte
.map(ResponseEntity::ok);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
Expand All @@ -88,6 +87,23 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
String stringFilter,
String smartFilterId,
Long offset,
Long timestamp,
String keySerde,
String valueSerde,
String cursor,
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
Expand All @@ -98,27 +114,26 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
contextBuilder.auditActions(AuditAction.VIEW);
}

seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;

var positions = new ConsumerPosition(
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
)
);

var context = contextBuilder.build();
return validateAccess(context)
.then(job)
.doOnEach(sig -> audit(context, sig));
var accessContext = contextBuilder.build();

Flux<TopicMessageEventDTO> messagesFlux;
if (cursor != null) {
messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
} else {
messagesFlux = messagesService.loadMessages(
getCluster(clusterName),
topicName,
ConsumerPosition.create(mode, topicName, partitions, timestamp, offset),
stringFilter,
smartFilterId,
limit,
keySerde,
valueSerde
);
}
return accessControlService.validateAccess(accessContext)
.then(Mono.just(ResponseEntity.ok(messagesFlux)))
.doOnEach(sig -> auditService.audit(accessContext, sig));
}

@Override
Expand All @@ -140,34 +155,6 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
).doOnEach(sig -> audit(context, sig));
}

/**
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {
String[] split = p.split("::");
if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}

return Pair.of(
new TopicPartition(topic, Integer.parseInt(split[0])),
Long.parseLong(split[1])
);
})
.collect(toMap(Pair::getKey, Pair::getValue));
}

@Override
public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
String topicName,
Expand Down Expand Up @@ -195,7 +182,20 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
);
}

@Override
public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
String topicName,
Mono<MessageFilterRegistrationDTO> registration,
ServerWebExchange exchange) {

final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());


return validateAccess.then(registration)
.map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
.map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.emitter;

import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import jakarta.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -21,12 +22,14 @@ protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsum
return records;
}

protected boolean sendLimitReached() {
protected boolean isSendLimitReached() {
return messagesProcessing.limitReached();
}

protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
protected void send(FluxSink<TopicMessageEventDTO> sink,
Iterable<ConsumerRecord<Bytes, Bytes>> records,
@Nullable Cursor.Tracking cursor) {
messagesProcessing.send(sink, records, cursor);
}

protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
Expand All @@ -37,8 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
sink.complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
false,
messagesPerPage
),
pollingSettings
new MessagesProcessing(deserializer, filter, false, messagesPerPage),
pollingSettings,
cursor
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessageNextPageCursorDTO;
import javax.annotation.Nullable;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -26,10 +28,15 @@ void incFilterApplyError() {
filterApplyErrors++;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.cursor(
cursor != null
? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
: null
)
.consuming(createConsumingStats())
);
}
Expand Down
Loading
Loading