Skip to content

Commit

Permalink
Merge branch 'trunk' into KAFKA-12895
Browse files Browse the repository at this point in the history
  • Loading branch information
frankvicky committed Oct 2, 2024
2 parents 62d858b + 49d7ea6 commit 70554d6
Show file tree
Hide file tree
Showing 60 changed files with 1,627 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# under the License.
#
---
name: "Update Check Run"
description: "Update the status of a commit check using the GH CLI. See https://docs.github.com/en/rest/checks/runs?apiVersion=2022-11-28#create-a-check-run"
name: "Update Commit Status Check"
description: "Update the status of a commit check using the GH CLI"
inputs:
# Composite actions do not support typed parameters. Everything is treated as a string
# See: https://github.com/actions/runner/issues/2238
Expand All @@ -39,43 +39,23 @@ inputs:
description: "The text to display next to the check"
default: ""
required: false
title:
description: "The title of the status check"
context:
description: "The name of the status check"
required: true
status:
description: "The status of the check. Can be one of: queued, in_progress, completed, waiting, requested, pending"
state:
description: "The state of the check. Can be one of: error, failure, pending, success"
required: true
conclusion:
description: "Required if status is 'completed'. Can be one of: action_required, cancelled, failure, neutral, success, skipped, stale, timed_out"
required: false

runs:
using: "composite"
steps:
- if: inputs.conclusion == ''
shell: bash
env:
GH_TOKEN: ${{ inputs.gh-token }}
run: |
gh api --method POST -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" \
/repos/${{ inputs.repository }}/check-runs \
-f "head_sha=${{ inputs.commit_sha }}" \
-f "status=${{ inputs.status }}" \
-f "details_url=${{ inputs.url }}" \
-f "output[title]=${{ inputs.title }}" \
-f "output[summary]=${{ inputs.description }}" \
-f "name=${{ inputs.title }}"
- if: inputs.conclusion != ''
- name: Update Check
shell: bash
env:
GH_TOKEN: ${{ inputs.gh-token }}
run: |
gh api --method POST -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" \
/repos/${{ inputs.repository }}/check-runs \
-f "head_sha=${{ inputs.commit_sha }}" \
-f "status=${{ inputs.status }}" \
-f "conclusion=${{ inputs.conclusion }}" \
-f "details_url=${{ inputs.url }}" \
-f "output[title]=${{ inputs.title }}" \
-f "output[summary]=${{ inputs.description }}" \
-f "name=${{ inputs.title }}"
/repos/${{ inputs.repository }}/statuses/${{ inputs.commit_sha }} \
-f "state=${{ inputs.state }}" -f "target_url=${{ inputs.url }}" \
-f "description=${{ inputs.description }}" \
-f "context=${{ inputs.context }}"
93 changes: 93 additions & 0 deletions .github/scripts/rat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from glob import glob
import logging
import os
import sys
import xml.etree.ElementTree

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)


def get_env(key: str) -> str:
value = os.getenv(key)
logger.debug(f"Read env {key}: {value}")
return value


def parse_rat_report(fp):
file_count = 0
unapproved_licenses = []
approved_count = 0
root = xml.etree.ElementTree.parse(fp).getroot()

for resource in root.findall(".//resource"):
file_name = resource.get("name")
license_approval_elem = resource.find("license-approval")

if license_approval_elem is not None and license_approval_elem.get("name") == "false":
file_count += 1
unapproved_licenses.append(file_name)
else:
approved_count += 1

return approved_count, file_count, unapproved_licenses


if __name__ == "__main__":
"""
Parse Apache Rat reports and generate GitHub annotations.
"""
if not os.getenv("GITHUB_WORKSPACE"):
print("This script is intended to by run by GitHub Actions.")
exit(1)

reports = glob(pathname="**/rat/*.xml", recursive=True)
logger.debug(f"Found {len(reports)} Rat reports")

total_unapproved_licenses = 0
total_approved_licenses = 0
all_unapproved_files = []

workspace_path = get_env("GITHUB_WORKSPACE")

for report in reports:
with open(report, "r") as fp:
logger.debug(f"Parsing Rat report file: {report}")
approved_count, unapproved_count, unapproved_files = parse_rat_report(fp)

total_approved_licenses += approved_count
total_unapproved_licenses += unapproved_count
all_unapproved_files.extend(unapproved_files)

if total_unapproved_licenses == 0:
print(f"All {total_approved_licenses} files have approved licenses. No unapproved licenses found.")
exit(0)
else:
print(f"{total_approved_licenses} approved licenses")
print(f"{total_unapproved_licenses} unapproved licenses")

print("Files with unapproved licenses:")
for file in all_unapproved_files:
rel_path = os.path.relpath(file, workspace_path)
message = f"File with unapproved license: {rel_path}"
print(f"::notice file={rel_path},title=Unapproved License::{message}")

exit(1)
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ jobs:
run: python .github/scripts/checkstyle.py
env:
GITHUB_WORKSPACE: ${{ github.workspace }}
- name: Annotate Rat errors
# Avoid duplicate annotations, only run on java 21
if: ${{ failure() && matrix.java == '21' }}
run: python .github/scripts/rat.py
env:
GITHUB_WORKSPACE: ${{ github.workspace }}

test:
needs: validate
Expand Down
21 changes: 9 additions & 12 deletions .github/workflows/ci-complete.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,15 @@ jobs:
path: ~/.gradle/build-scan-data # This is where Gradle buffers unpublished build scan data when --no-scan is given
- name: Handle missing scan
if: ${{ steps.download-build-scan.outcome == 'failure' }}
uses: ./.github/actions/gh-api-update-check
uses: ./.github/actions/gh-api-update-status
with:
gh-token: ${{ secrets.GITHUB_TOKEN }}
repository: ${{ github.repository }}
commit_sha: ${{ github.event.workflow_run.head_sha }}
url: '${{ github.event.workflow_run.html_url }}'
description: 'Could not find build scan'
title: 'Gradle Build Scan / Java ${{ matrix.java }}'
status: 'completed'
conclusion: 'skipped'
context: 'Gradle Build Scan / Java ${{ matrix.java }}'
state: 'error'
- name: Publish Scan
id: publish-build-scan
if: ${{ steps.download-build-scan.outcome == 'success' }}
Expand All @@ -99,25 +98,23 @@ jobs:
fi
- name: Handle failed publish
if: ${{ failure() && steps.publish-build-scan.outcome == 'failure' }}
uses: ./.github/actions/gh-api-update-check
uses: ./.github/actions/gh-api-update-status
with:
gh-token: ${{ secrets.GITHUB_TOKEN }}
repository: ${{ github.repository }}
commit_sha: ${{ github.event.workflow_run.head_sha }}
url: '${{ github.event.repository.html_url }}/actions/runs/${{ github.run_id }}'
description: 'The build scan failed to be published'
title: 'Gradle Build Scan / Java ${{ matrix.java }}'
status: 'completed'
conclusion: 'failure'
context: 'Gradle Build Scan / Java ${{ matrix.java }}'
state: 'error'
- name: Update Status Check
if: ${{ steps.publish-build-scan.outcome == 'success' }}
uses: ./.github/actions/gh-api-update-check
uses: ./.github/actions/gh-api-update-status
with:
gh-token: ${{ secrets.GITHUB_TOKEN }}
repository: ${{ github.repository }}
commit_sha: ${{ github.event.workflow_run.head_sha }}
url: ${{ steps.publish-build-scan.outputs.build-scan-url }}
description: 'The build scan was successfully published'
title: 'Gradle Build Scan / Java ${{ matrix.java }}'
status: 'completed'
conclusion: 'success'
context: 'Gradle Build Scan / Java ${{ matrix.java }}'
state: 'success'
Original file line number Diff line number Diff line change
Expand Up @@ -4539,22 +4539,29 @@ void handleResponse(AbstractResponse abstractResponse) {
ApiError topLevelError = response.topLevelError();
switch (topLevelError.error()) {
case NONE:
for (final UpdatableFeatureResult result : response.data().results()) {
final KafkaFutureImpl<Void> future = updateFutures.get(result.feature());
if (future == null) {
log.warn("Server response mentioned unknown feature {}", result.feature());
} else {
final Errors error = Errors.forCode(result.errorCode());
if (error == Errors.NONE) {
future.complete(null);
// For V2 and above, None responses will just have a top level NONE error -- mark all the futures as completed.
if (response.data().results().isEmpty()) {
for (final KafkaFutureImpl<Void> future : updateFutures.values()) {
future.complete(null);
}
} else {
for (final UpdatableFeatureResult result : response.data().results()) {
final KafkaFutureImpl<Void> future = updateFutures.get(result.feature());
if (future == null) {
log.warn("Server response mentioned unknown feature {}", result.feature());
} else {
future.completeExceptionally(error.exception(result.errorMessage()));
final Errors error = Errors.forCode(result.errorCode());
if (error == Errors.NONE) {
future.complete(null);
} else {
future.completeExceptionally(error.exception(result.errorMessage()));
}
}
}
// The server should send back a response for every feature, but we do a sanity check anyway.
completeUnrealizedFutures(updateFutures.entrySet().stream(),
feature -> "The controller response did not contain a result for feature " + feature);
}
// The server should send back a response for every feature, but we do a sanity check anyway.
completeUnrealizedFutures(updateFutures.entrySet().stream(),
feature -> "The controller response did not contain a result for feature " + feature);
break;
case NOT_CONTROLLER:
handleNotControllerError(topLevelError.error());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ ShareFetchCollector<K, V> build(
public Set<String> subscription() {
acquireAndEnsureOpen();
try {
return subscriptions.subscription();
return Collections.unmodifiableSet(subscriptions.subscription());
} finally {
release();
}
Expand Down Expand Up @@ -594,7 +594,6 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
return ConsumerRecords.empty();
} finally {
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
wakeupTrigger.clearTask();
release();
}
}
Expand All @@ -612,13 +611,16 @@ private ShareFetch<K, V> pollForFetches(final Timer timer) {

// Wait a bit - this is where we will fetch records
Timer pollTimer = time.timer(pollTimeout);
wakeupTrigger.setShareFetchAction(fetchBuffer);

try {
fetchBuffer.awaitNotEmpty(pollTimer);
} catch (InterruptException e) {
log.trace("Timeout during fetch", e);
throw e;
} finally {
timer.update(pollTimer.currentTimeMs());
wakeupTrigger.clearTask();
}

return collect(Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ public void wakeup() {
FetchAction fetchAction = (FetchAction) task;
fetchAction.fetchBuffer().wakeup();
return new WakeupFuture();
} else if (task instanceof ShareFetchAction) {
ShareFetchAction shareFetchAction = (ShareFetchAction) task;
shareFetchAction.fetchBuffer().wakeup();
return new WakeupFuture();
} else {
return task;
}
});
}

/**
* If there is no pending task, set the pending task active.
* If wakeup was called before setting an active task, the current task will complete exceptionally with
* WakeupException right
* away.
* if there is an active task, throw exception.
* If there is no pending task, set the pending task active.
* If wakeup was called before setting an active task, the current task will complete exceptionally with
* WakeupException right away.
* If there is an active task, throw exception.
* @param currentTask
* @param <T>
* @return
Expand Down Expand Up @@ -105,6 +108,25 @@ public void setFetchAction(final FetchBuffer fetchBuffer) {
}
}

public void setShareFetchAction(final ShareFetchBuffer fetchBuffer) {
final AtomicBoolean throwWakeupException = new AtomicBoolean(false);
pendingTask.getAndUpdate(task -> {
if (task == null) {
return new ShareFetchAction(fetchBuffer);
} else if (task instanceof WakeupFuture) {
throwWakeupException.set(true);
return null;
} else if (task instanceof DisabledWakeups) {
return task;
}
// last active state is still active
throw new IllegalStateException("Last active task is still active");
});
if (throwWakeupException.get()) {
throw new WakeupException();
}
}

public void disableWakeups() {
pendingTask.set(new DisabledWakeups());
}
Expand All @@ -113,7 +135,7 @@ public void clearTask() {
pendingTask.getAndUpdate(task -> {
if (task == null) {
return null;
} else if (task instanceof ActiveFuture || task instanceof FetchAction) {
} else if (task instanceof ActiveFuture || task instanceof FetchAction || task instanceof ShareFetchAction) {
return null;
}
return task;
Expand Down Expand Up @@ -172,4 +194,17 @@ public FetchBuffer fetchBuffer() {
return fetchBuffer;
}
}

static class ShareFetchAction implements Wakeupable {

private final ShareFetchBuffer fetchBuffer;

public ShareFetchAction(ShareFetchBuffer fetchBuffer) {
this.fetchBuffer = fetchBuffer;
}

public ShareFetchBuffer fetchBuffer() {
return fetchBuffer;
}
}
}
Loading

0 comments on commit 70554d6

Please sign in to comment.