Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions .github/workflows/ci-quick.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#
# This is our most used build and the one used when a new PR is created/updated. It needs to
# be and remain as green as possible. It is also executed when a PR is actually merged to make sure
# apache/main stays stable. It runs a subset of the tests (the ones in the quick profile) to make sure it finishes
# in a reasonable time.
#
name: CI Quick

on:
push:
branches: [ "main", "activemq-6.2.x", "activemq-5.19.x" ]
pull_request:
branches: [ "main", "activemq-6.2.x", "activemq-5.19.x" ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
name: build

permissions:
contents: read

timeout-minutes: 60

strategy:
matrix:
os: [ ubuntu-24.04, macos-26, windows-2025 ]
java-version: [ 11 ]

runs-on: ${{ matrix.os }}

steps:
- uses: actions/checkout@v6
- name: Set up JDK
uses: actions/setup-java@v5
with:
java-version: ${{ matrix.java-version }}
distribution: temurin
cache: 'maven'
- name: Build
run: mvn -U -B -e clean install -DskipTests
- name: Verify
run: mvn apache-rat:check

test:
name: test
needs: build

permissions:
contents: read
checks: write
pull-requests: write

timeout-minutes: 180

runs-on: ubuntu-24.04

steps:
- uses: actions/checkout@v6
- name: Set up JDK
uses: actions/setup-java@v5
with:
java-version: 11
distribution: temurin
cache: 'maven'
- name: Test
run: mvn -B -e -fae verify -Pactivemq.tests-quick -Dsurefire.rerunFailingTestsCount=3
- name: Upload Test Results
if: always()
uses: actions/upload-artifact@v7
with:
name: test-results
path: '**/target/surefire-reports/*.xml'
- name: Publish Test Results
if: always()
uses: EnricoMi/publish-unit-test-result-action@v2
with:
large_files: true
report_individual_runs: true
report_suite_logs: error
files: '**/target/surefire-reports/*.xml'
2 changes: 1 addition & 1 deletion activemq-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.7-TT.2-SNAPSHOT</version>
<version>5.19.8-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-all</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion activemq-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.7-TT.2-SNAPSHOT</version>
<version>5.19.8-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-amqp</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion activemq-blueprint/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.7-TT.2-SNAPSHOT</version>
<version>5.19.8-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-blueprint</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion activemq-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.7-TT.2-SNAPSHOT</version>
<version>5.19.8-TT.1-SNAPSHOT</version>
</parent>

<artifactId>activemq-broker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -163,6 +164,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
private final long connectedTimestamp;
private final CompletableFuture<ConnectionId> initialConnectionId = new CompletableFuture<>();

/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
Expand Down Expand Up @@ -851,11 +853,16 @@ public Response processAddConnection(ConnectionInfo info) throws Exception {

try {
broker.addConnection(context, info);
// Complete the future with the connectionId if we completed
// the broker.addConnection() chain successfully
initialConnectionId.complete(info.getConnectionId());
} catch (Exception e) {
synchronized (brokerConnectionStates) {
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
// complete with the exception
initialConnectionId.completeExceptionally(e);
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}",
info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
//AMQ-6561 - stop for all exceptions on addConnection
Expand Down Expand Up @@ -1387,13 +1394,10 @@ public Response processBrokerInfo(BrokerInfo info) {
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
try {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(getNetworkConfiguration(info), info);
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e);
return null;
}
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
Expand All @@ -1403,10 +1407,8 @@ public Response processBrokerInfo(BrokerInfo info) {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());

if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(config, info);

// check for existing duplex connection hanging about

Expand Down Expand Up @@ -1473,6 +1475,30 @@ public Response processBrokerInfo(BrokerInfo info) {
return null;
}

private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) {
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
// this will complete when the connection id has been set, or immediately if already set
initialConnectionId.whenComplete((connectionId, t) -> {
try {
if (t != null) {
LOG.warn("SyncDurableSubs will be skipped due to error {}",
t.getMessage());
return;
}
// check connection still registered
if (lookupConnectionState(connectionId) != null) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
this.broker.getBrokerService(), config));
}
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}",
info.getBrokerId(), e);
}
});
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private HashMap<String, String> createMap(Properties properties) {
return new HashMap(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BrokerView implements BrokerViewMBean {

public static final Set<String> DENIED_TRANSPORT_SCHEMES = Set.of("vm", "http",
"multicast", "zeroconf", "discovery", "fanout", "mock", "peer", "failover",
"proxy", "reliable", "simple", "udp");
"proxy", "reliable", "simple", "udp", "masterslave");

ManagedRegionBroker broker;

Expand Down Expand Up @@ -574,26 +574,39 @@ private static void validateAllowedUrl(String uriString) throws URISyntaxExcepti
// Validate the URI does not contain a denied transport scheme
private static void validateAllowedUri(URI uri, int depth) throws URISyntaxException {
// Don't allow more than 5 nested URIs to prevent blowing the stack
// If we are greater than 4 then this is the 5th level of composite
if (depth > 4) {
if (depth > 5) {
throw new IllegalArgumentException("URI can't contain more than 5 nested composite URIs");
}

// First check the main URI scheme
validateAllowedScheme(uri.getScheme());

// If composite, iterate and check each of the composite URIs
if (URISupport.isCompositeURI(uri)) {
URISupport.CompositeData data = URISupport.parseComposite(uri);
// We need to check if the URI is composite and/or contains nested URIs
// The utility method URISupport#isCompositeURI is not good enough here
// because it misses if there are no parentheses and also is primarily meant
// for checking comma separated URIs and not nested URIs.
//
// The best way to handle all cases is to use the same logic that the transports
// use to process the URIs and that is to simply attempt to parse it and check each
// of the parsed components. This wll correctly handle the case when there
// are parentheses and also when the parentheses are skipped.
final URISupport.CompositeData data;
try {
data = URISupport.parseComposite(uri);
} catch (URISyntaxException e) {
// If this is not a valid URI then we can stop checking
// This can happen when parsing a nested URI and at the last portion
return;
}

if (data.getComponents() != null) {
depth++;
for (URI component : data.getComponents()) {
// Each URI could be a nested composite URI so call validateAllowedUri()
// to validate it. This check if composite first so we don't add to
// the recursive stack depth if there's a lot of URIs that are not composite
if (URISupport.isCompositeURI(component)) {
// Each URI could be a nested and/or composite URI so call validateAllowedUri()
// to validate it. If the scheme is null then the original URI is not composite
// or nested so we can skip the check, and we are finished.
if (component.getScheme() != null) {
validateAllowedUri(component, depth);
} else {
validateAllowedScheme(component.getScheme());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,21 @@ protected SecurityContext checkSecurityContext(ConnectionContext context) throws
return securityContext;
}

protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) {
Destination existing = this.getDestinationMap(destination).get(destination);
if (existing != null) {
protected boolean checkDestinationAdminAdd(SecurityContext securityContext, ActiveMQDestination destination) {
if (this.getDestinationMap(destination).get(destination) != null) {
return true;
}
return checkDestinationAdmin(securityContext, destination);
}

protected boolean checkDestinationAdminRemove(SecurityContext securityContext, ActiveMQDestination destination) {
if (this.getDestinationMap(destination).get(destination) == null) {
return true;
}
return checkDestinationAdmin(securityContext, destination);
}

protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) {
if (!securityContext.isBrokerContext()) {
Set<?> allowedACLs = null;
if (!destination.isTemporary()) {
Expand All @@ -100,18 +109,18 @@ protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveM
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, info.getDestination())) {
if (!checkDestinationAdminAdd(securityContext, info.getDestination())) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + info.getDestination());
}

super.addDestinationInfo(context, info);
}

@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, destination)) {
if (!checkDestinationAdminAdd(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination);
}

Expand All @@ -122,7 +131,7 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, destination)) {
if (!checkDestinationAdminRemove(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + destination);
}

Expand All @@ -135,7 +144,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, info.getDestination())) {
if (!checkDestinationAdminRemove(securityContext, info.getDestination())) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + info.getDestination());
}

Expand Down
Loading