diff --git a/.github/workflows/ci-quick.yml b/.github/workflows/ci-quick.yml new file mode 100644 index 00000000000..4cd613f988f --- /dev/null +++ b/.github/workflows/ci-quick.yml @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# This is our most used build and the one used when a new PR is created/updated. It needs to +# be and remain as green as possible. It is also executed when a PR is actually merged to make sure +# apache/main stays stable. It runs a subset of the tests (the ones in the quick profile) to make sure it finishes +# in a reasonable time. +# +name: CI Quick + +on: + push: + branches: [ "main", "activemq-6.2.x", "activemq-5.19.x" ] + pull_request: + branches: [ "main", "activemq-6.2.x", "activemq-5.19.x" ] + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + build: + name: build + + permissions: + contents: read + + timeout-minutes: 60 + + strategy: + matrix: + os: [ ubuntu-24.04, macos-26, windows-2025 ] + java-version: [ 11 ] + + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v6 + - name: Set up JDK + uses: actions/setup-java@v5 + with: + java-version: ${{ matrix.java-version }} + distribution: temurin + cache: 'maven' + - name: Build + run: mvn -U -B -e clean install -DskipTests + - name: Verify + run: mvn apache-rat:check + + test: + name: test + needs: build + + permissions: + contents: read + checks: write + pull-requests: write + + timeout-minutes: 180 + + runs-on: ubuntu-24.04 + + steps: + - uses: actions/checkout@v6 + - name: Set up JDK + uses: actions/setup-java@v5 + with: + java-version: 11 + distribution: temurin + cache: 'maven' + - name: Test + run: mvn -B -e -fae verify -Pactivemq.tests-quick -Dsurefire.rerunFailingTestsCount=3 + - name: Upload Test Results + if: always() + uses: actions/upload-artifact@v7 + with: + name: test-results + path: '**/target/surefire-reports/*.xml' + - name: Publish Test Results + if: always() + uses: EnricoMi/publish-unit-test-result-action@v2 + with: + large_files: true + report_individual_runs: true + report_suite_logs: error + files: '**/target/surefire-reports/*.xml' diff --git a/activemq-all/pom.xml b/activemq-all/pom.xml index a42780bb8a9..ab3b6395735 100644 --- a/activemq-all/pom.xml +++ b/activemq-all/pom.xml @@ -14,7 +14,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-all diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 50834e0b024..88f4793d7da 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-amqp diff --git a/activemq-blueprint/pom.xml b/activemq-blueprint/pom.xml index 61a8412da22..e574cb23fb7 100644 --- a/activemq-blueprint/pom.xml +++ b/activemq-blueprint/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-blueprint diff --git a/activemq-broker/pom.xml b/activemq-broker/pom.xml index d312a33fc9a..34c95219865 100644 --- a/activemq-broker/pom.xml +++ b/activemq-broker/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-broker diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index ed2fd1f376e..2ede7a547d1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -163,6 +164,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; private final long connectedTimestamp; + private final CompletableFuture initialConnectionId = new CompletableFuture<>(); /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -851,11 +853,16 @@ public Response processAddConnection(ConnectionInfo info) throws Exception { try { broker.addConnection(context, info); + // Complete the future with the connectionId if we completed + // the broker.addConnection() chain successfully + initialConnectionId.complete(info.getConnectionId()); } catch (Exception e) { synchronized (brokerConnectionStates) { brokerConnectionStates.remove(info.getConnectionId()); } unregisterConnectionState(info.getConnectionId()); + // complete with the exception + initialConnectionId.completeExceptionally(e); LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}", info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage()); //AMQ-6561 - stop for all exceptions on addConnection @@ -1387,13 +1394,10 @@ public Response processBrokerInfo(BrokerInfo info) { LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { try { - NetworkBridgeConfiguration config = getNetworkConfiguration(info); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(getNetworkConfiguration(info), info); } catch (Exception e) { - LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); + LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e); return null; } } else if (info.isNetworkConnection() && info.isDuplexConnection()) { @@ -1403,10 +1407,8 @@ public Response processBrokerInfo(BrokerInfo info) { NetworkBridgeConfiguration config = getNetworkConfiguration(info); config.setBrokerName(broker.getBrokerName()); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(config, info); // check for existing duplex connection hanging about @@ -1473,6 +1475,30 @@ public Response processBrokerInfo(BrokerInfo info) { return null; } + private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) { + if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + // this will complete when the connection id has been set, or immediately if already set + initialConnectionId.whenComplete((connectionId, t) -> { + try { + if (t != null) { + LOG.warn("SyncDurableSubs will be skipped due to error {}", + t.getMessage()); + return; + } + // check connection still registered + if (lookupConnectionState(connectionId) != null) { + LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo( + this.broker.getBrokerService(), config)); + } + } catch (Exception e) { + LOG.error("Failed to respond to network bridge creation from broker {}", + info.getBrokerId(), e); + } + }); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) private HashMap createMap(Properties properties) { return new HashMap(properties); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index ad81f015832..b5530a63a2c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -46,7 +46,7 @@ public class BrokerView implements BrokerViewMBean { public static final Set DENIED_TRANSPORT_SCHEMES = Set.of("vm", "http", "multicast", "zeroconf", "discovery", "fanout", "mock", "peer", "failover", - "proxy", "reliable", "simple", "udp"); + "proxy", "reliable", "simple", "udp", "masterslave"); ManagedRegionBroker broker; @@ -574,26 +574,39 @@ private static void validateAllowedUrl(String uriString) throws URISyntaxExcepti // Validate the URI does not contain a denied transport scheme private static void validateAllowedUri(URI uri, int depth) throws URISyntaxException { // Don't allow more than 5 nested URIs to prevent blowing the stack - // If we are greater than 4 then this is the 5th level of composite - if (depth > 4) { + if (depth > 5) { throw new IllegalArgumentException("URI can't contain more than 5 nested composite URIs"); } // First check the main URI scheme validateAllowedScheme(uri.getScheme()); - // If composite, iterate and check each of the composite URIs - if (URISupport.isCompositeURI(uri)) { - URISupport.CompositeData data = URISupport.parseComposite(uri); + // We need to check if the URI is composite and/or contains nested URIs + // The utility method URISupport#isCompositeURI is not good enough here + // because it misses if there are no parentheses and also is primarily meant + // for checking comma separated URIs and not nested URIs. + // + // The best way to handle all cases is to use the same logic that the transports + // use to process the URIs and that is to simply attempt to parse it and check each + // of the parsed components. This wll correctly handle the case when there + // are parentheses and also when the parentheses are skipped. + final URISupport.CompositeData data; + try { + data = URISupport.parseComposite(uri); + } catch (URISyntaxException e) { + // If this is not a valid URI then we can stop checking + // This can happen when parsing a nested URI and at the last portion + return; + } + + if (data.getComponents() != null) { depth++; for (URI component : data.getComponents()) { - // Each URI could be a nested composite URI so call validateAllowedUri() - // to validate it. This check if composite first so we don't add to - // the recursive stack depth if there's a lot of URIs that are not composite - if (URISupport.isCompositeURI(component)) { + // Each URI could be a nested and/or composite URI so call validateAllowedUri() + // to validate it. If the scheme is null then the original URI is not composite + // or nested so we can skip the check, and we are finished. + if (component.getScheme() != null) { validateAllowedUri(component, depth); - } else { - validateAllowedScheme(component.getScheme()); } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java index 06eabd2584e..c65f9a38517 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java @@ -75,12 +75,21 @@ protected SecurityContext checkSecurityContext(ConnectionContext context) throws return securityContext; } - protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) { - Destination existing = this.getDestinationMap(destination).get(destination); - if (existing != null) { + protected boolean checkDestinationAdminAdd(SecurityContext securityContext, ActiveMQDestination destination) { + if (this.getDestinationMap(destination).get(destination) != null) { return true; } + return checkDestinationAdmin(securityContext, destination); + } + protected boolean checkDestinationAdminRemove(SecurityContext securityContext, ActiveMQDestination destination) { + if (this.getDestinationMap(destination).get(destination) == null) { + return true; + } + return checkDestinationAdmin(securityContext, destination); + } + + protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) { if (!securityContext.isBrokerContext()) { Set allowedACLs = null; if (!destination.isTemporary()) { @@ -100,7 +109,7 @@ protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveM public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, info.getDestination())) { + if (!checkDestinationAdminAdd(securityContext, info.getDestination())) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + info.getDestination()); } @@ -108,10 +117,10 @@ public void addDestinationInfo(ConnectionContext context, DestinationInfo info) } @Override - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, destination)) { + if (!checkDestinationAdminAdd(securityContext, destination)) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination); } @@ -122,7 +131,7 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, destination)) { + if (!checkDestinationAdminRemove(securityContext, destination)) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + destination); } @@ -135,7 +144,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, info.getDestination())) { + if (!checkDestinationAdminRemove(securityContext, info.getDestination())) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + info.getDestination()); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index 56baaeec98f..63b94dac5d9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -19,11 +19,14 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactoryHandler; import org.apache.activemq.broker.BrokerRegistry; @@ -44,12 +47,29 @@ public class VMTransportFactory extends TransportFactory { + public static final String VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP = + "org.apache.activemq.transport.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED"; + public static final String DEFAULT_ALLOWED_SCHEMES = "broker,properties"; + public static final ConcurrentMap BROKERS = new ConcurrentHashMap(); public static final ConcurrentMap CONNECTORS = new ConcurrentHashMap(); public static final ConcurrentMap SERVERS = new ConcurrentHashMap(); private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class); BrokerFactoryHandler brokerFactoryHandler; + private final Set allowedSchemes; + + public VMTransportFactory() { + final String allowedSchemes = System.getProperty(VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP, + DEFAULT_ALLOWED_SCHEMES); + + // Asterisk will map to null which will allow all and skip checking + // Empty string will map to an empty set and will deny all + this.allowedSchemes = !allowedSchemes.equals("*") ? + Arrays.stream(allowedSchemes.split("\\s*,\\s*")) + .filter(s -> !s.isBlank()) + .collect(Collectors.toUnmodifiableSet()) : null; + } @Override public Transport doConnect(URI location) throws Exception { @@ -119,6 +139,7 @@ public Transport doCompositeConnect(URI location) throws Exception { throw new IOException("Broker named '" + host + "' does not exist."); } try { + validateBrokerCreationSchema(host, brokerURI); if (brokerFactoryHandler != null) { broker = brokerFactoryHandler.createBroker(brokerURI); } else { @@ -162,6 +183,20 @@ public Transport doCompositeConnect(URI location) throws Exception { return transport; } + private void validateBrokerCreationSchema(String host, URI brokerURI) { + if (allowedSchemes != null) { + final String detectedScheme = brokerURI.getScheme(); + if (detectedScheme == null) { + throw new IllegalArgumentException("Could not detect scheme in given URI [" + brokerURI + "]"); + } + if (!allowedSchemes.contains(detectedScheme)){ + throw new IllegalArgumentException("Broker named '" + host + "' does not exist and " + + "broker creation using the scheme '" + detectedScheme + "' is not enabled via the VMTransportFactory. " + + "To allow creation, configure the system property " + VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP); + } + } + } + private static String extractHost(URI location) { String host = location.getHost(); if (host == null || host.length() == 0) { diff --git a/activemq-cf/pom.xml b/activemq-cf/pom.xml index 5dffd2da1d5..87877abc025 100644 --- a/activemq-cf/pom.xml +++ b/activemq-cf/pom.xml @@ -24,7 +24,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-cf diff --git a/activemq-client-jakarta/pom.xml b/activemq-client-jakarta/pom.xml index 026498d41da..5413b61758d 100644 --- a/activemq-client-jakarta/pom.xml +++ b/activemq-client-jakarta/pom.xml @@ -20,7 +20,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-client-jakarta bundle diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index 5ed25b92515..5e735aef7a4 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-client diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java index 396b6502b7a..fe309990638 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java @@ -32,15 +32,21 @@ public class ClassLoadingAwareObjectInputStream extends ObjectInputStream { private static final ClassLoader FALLBACK_CLASS_LOADER = ClassLoadingAwareObjectInputStream.class.getClassLoader(); + public static final Set> ALLOWED_JDK_TYPES = Set.of( + Boolean.class, Short.class, Integer.class, Long.class, + Float.class, Double.class, String.class, Character.class, Byte.class, + Throwable.class, Exception.class, StackTraceElement.class); + + public static final String DEFAULT_SERIALIZABLE_PACKAGES = "org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper"; public static final String[] serializablePackages; - private List trustedPackages = new ArrayList(); + private List trustedPackages = new ArrayList<>(); private boolean trustAllPackages = false; private final ClassLoader inLoader; static { - serializablePackages = System.getProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","java.lang,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper").split(","); + serializablePackages = System.getProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", DEFAULT_SERIALIZABLE_PACKAGES).split(","); } public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException { @@ -98,7 +104,7 @@ private boolean trustAllPackages() { } private void checkSecurity(Class clazz) throws ClassNotFoundException { - if (trustAllPackages() || clazz.isPrimitive()) { + if (trustAllPackages() || clazz.isPrimitive() || ALLOWED_JDK_TYPES.contains(clazz)) { return; } diff --git a/activemq-client/src/main/java/org/apache/activemq/util/XStreamSupport.java b/activemq-client/src/main/java/org/apache/activemq/util/XStreamSupport.java index 0fe4cfe90ff..50fdc9af6a0 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/XStreamSupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/XStreamSupport.java @@ -27,13 +27,16 @@ public class XStreamSupport { + private static final Class[] ALLOWED_JDK_TYPES = + ClassLoadingAwareObjectInputStream.ALLOWED_JDK_TYPES.toArray(new Class[0]); + public static XStream createXStream() { XStream stream = new XStream(); stream.addPermission(NoTypePermission.NONE); stream.addPermission(PrimitiveTypePermission.PRIMITIVES); stream.allowTypeHierarchy(Collection.class); stream.allowTypeHierarchy(Map.class); - stream.allowTypes(new Class[]{String.class}); + stream.allowTypes(ALLOWED_JDK_TYPES); if (ClassLoadingAwareObjectInputStream.isAllAllowed()) { stream.addPermission(AnyTypePermission.ANY); } else { diff --git a/activemq-client/src/test/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStreamTest.java b/activemq-client/src/test/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStreamTest.java index 2e3e919ee43..ab256fb9d84 100644 --- a/activemq-client/src/test/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStreamTest.java +++ b/activemq-client/src/test/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStreamTest.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.ObjectOutputStream; import java.util.Arrays; +import java.util.Set; import java.util.UUID; import java.util.Vector; @@ -206,8 +207,23 @@ public void testPrimitveCharNotFiltered() throws Exception { } @Test - public void testReadObjectStringNotFiltered() throws Exception { - doTestReadObject(new String(name.getMethodName()), ACCEPTS_NONE_FILTER); + public void testReadObjectJdkTypesNotFiltered() throws Exception { + for (String filter : Set.of(ACCEPTS_ALL_FILTER, ACCEPTS_NONE_FILTER, + ClassLoadingAwareObjectInputStream.DEFAULT_SERIALIZABLE_PACKAGES)) { + doTestReadObject(Boolean.TRUE, filter); + doTestReadObject("test", filter); + doTestReadObject(Byte.valueOf("0"), filter); + doTestReadObject(Character.valueOf('a'), filter); + doTestReadObject(Integer.valueOf(100), filter); + doTestReadObject(Long.valueOf(0), filter); + doTestReadObject(Float.valueOf(0), filter); + doTestReadObject(Double.valueOf(0), filter); + } + + // these also require collections classes in java util as well as StackTraceElement + // they also can't be compared for equality as they don't implement equals + doTestReadObject(new Exception(), "java.util", false); + doTestReadObject(new Throwable(), "java.util", false); } //----- Test that primitive arrays get past filters ----------------------// @@ -429,6 +445,10 @@ public void testReadObjectFailsWithUnstrustedContentInTrustedType() throws Excep //----- Internal methods -------------------------------------------------// private void doTestReadObject(Object value, String filter) throws Exception { + doTestReadObject(value, filter, true); + } + + private void doTestReadObject(Object value, String filter, boolean equalityCheck) throws Exception { byte[] serialized = serializeObject(value); try (ByteArrayInputStream input = new ByteArrayInputStream(serialized); @@ -441,10 +461,12 @@ private void doTestReadObject(Object value, String filter) throws Exception { Object result = reader.readObject(); assertNotNull(result); assertEquals(value.getClass(), result.getClass()); - if (result.getClass().isArray()) { - assertTrue(Arrays.deepEquals((Object[]) value, (Object[]) result)); - } else { - assertEquals(value, result); + if (equalityCheck) { + if (result.getClass().isArray()) { + assertTrue(Arrays.deepEquals((Object[]) value, (Object[]) result)); + } else { + assertEquals(value, result); + } } } } diff --git a/activemq-console/pom.xml b/activemq-console/pom.xml index c2d3b66312a..f3dd6cf79a1 100644 --- a/activemq-console/pom.xml +++ b/activemq-console/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-console diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml index e9d2d23abec..bc9458b716d 100644 --- a/activemq-http/pom.xml +++ b/activemq-http/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-http diff --git a/activemq-jaas/pom.xml b/activemq-jaas/pom.xml index 500cc1bba23..255bf8fa0cc 100644 --- a/activemq-jaas/pom.xml +++ b/activemq-jaas/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-jaas diff --git a/activemq-jdbc-store/pom.xml b/activemq-jdbc-store/pom.xml index b31151f3746..0beff98d4f8 100644 --- a/activemq-jdbc-store/pom.xml +++ b/activemq-jdbc-store/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-jdbc-store diff --git a/activemq-jms-pool/pom.xml b/activemq-jms-pool/pom.xml index 8d3ea4aa1d0..cda2d0c4d58 100644 --- a/activemq-jms-pool/pom.xml +++ b/activemq-jms-pool/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-jms-pool diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml index 2b6190a1b1d..505394b1a27 100644 --- a/activemq-kahadb-store/pom.xml +++ b/activemq-kahadb-store/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-kahadb-store diff --git a/activemq-karaf-itest/pom.xml b/activemq-karaf-itest/pom.xml index 8225b51dc43..e40eea5ef51 100644 --- a/activemq-karaf-itest/pom.xml +++ b/activemq-karaf-itest/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-karaf-itest diff --git a/activemq-karaf/pom.xml b/activemq-karaf/pom.xml index 63680fca532..bcf871a80c6 100644 --- a/activemq-karaf/pom.xml +++ b/activemq-karaf/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-karaf diff --git a/activemq-log4j-appender/pom.xml b/activemq-log4j-appender/pom.xml index cca0b647f18..f895a00c068 100644 --- a/activemq-log4j-appender/pom.xml +++ b/activemq-log4j-appender/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-log4j-appender diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml index d641e71b1d6..751fe023862 100644 --- a/activemq-mqtt/pom.xml +++ b/activemq-mqtt/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-mqtt diff --git a/activemq-openwire-generator/pom.xml b/activemq-openwire-generator/pom.xml index 63d048abf6c..4c9fba1157c 100644 --- a/activemq-openwire-generator/pom.xml +++ b/activemq-openwire-generator/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-openwire-generator diff --git a/activemq-openwire-legacy/pom.xml b/activemq-openwire-legacy/pom.xml index cff64ce2538..1eccf831e4d 100644 --- a/activemq-openwire-legacy/pom.xml +++ b/activemq-openwire-legacy/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-openwire-legacy diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index a04bd753270..8582b21d1f7 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-osgi diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml index f2283dd0b9d..1f3529ccc2a 100644 --- a/activemq-partition/pom.xml +++ b/activemq-partition/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-partition diff --git a/activemq-pool/pom.xml b/activemq-pool/pom.xml index bf8fe719706..c7069e10334 100644 --- a/activemq-pool/pom.xml +++ b/activemq-pool/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-pool diff --git a/activemq-ra/pom.xml b/activemq-ra/pom.xml index b245c60bdc6..e626cdf7944 100644 --- a/activemq-ra/pom.xml +++ b/activemq-ra/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-ra diff --git a/activemq-rar/pom.xml b/activemq-rar/pom.xml index 8a417be8792..c9c5a9c1fbd 100644 --- a/activemq-rar/pom.xml +++ b/activemq-rar/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-rar diff --git a/activemq-run/pom.xml b/activemq-run/pom.xml index 96b6180a9ac..776aaff52a6 100644 --- a/activemq-run/pom.xml +++ b/activemq-run/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-run diff --git a/activemq-runtime-config/pom.xml b/activemq-runtime-config/pom.xml index 1db2de38cd4..dce9674a1d7 100644 --- a/activemq-runtime-config/pom.xml +++ b/activemq-runtime-config/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-runtime-config diff --git a/activemq-shiro/pom.xml b/activemq-shiro/pom.xml index 1adb8338603..058d287f445 100644 --- a/activemq-shiro/pom.xml +++ b/activemq-shiro/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-shiro diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index cb57e930f4c..3f431b214c9 100644 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-spring diff --git a/activemq-stomp/pom.xml b/activemq-stomp/pom.xml index d1a0105b207..de60a193c1e 100644 --- a/activemq-stomp/pom.xml +++ b/activemq-stomp/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-stomp diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 89f591f1f65..4142e1fd660 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -1167,6 +1167,62 @@ public void testTransformationSendJSONObject() throws Exception { assertEquals("Dejan", object.getName()); } + + @Test(timeout = 60000) + public void testTransformationReceiveXMLObjectDouble() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + // Double should be allowed by default + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + + "1.1" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + Message message = consumer.receive(2500); + assertNotNull(message); + + LOG.info("Broker sent: {}", message); + + assertTrue(message instanceof ObjectMessage); + ObjectMessage objectMessage = (ObjectMessage)message; + Double object = (Double)objectMessage.getObject(); + assertEquals(Double.valueOf(1.1), object); + } + + @Test(timeout = 60000) + public void testTransformationSendXMLObjectNotAllowed() throws Exception { + MessageConsumer consumer = session.createConsumer(queue); + + String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + // ProcessBuilder is not allowed by default so the conversion should fail and + // then fall back to using a TextMessage, as well as setting an error header + frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + + "id" + Stomp.NULL; + + stompConnection.sendFrame(frame); + + Message message = consumer.receive(2500); + assertNotNull(message); + LOG.info("Broker sent: {}", message); + + // The message should be Text and marked with a transformation error header + assertTrue(message instanceof TextMessage); + assertEquals("java.lang.ProcessBuilder", message.getStringProperty("transformation-error")); + } + @Test(timeout = 60000) public void testTransformationSubscribeXML() throws Exception { diff --git a/activemq-tooling/activemq-junit/pom.xml b/activemq-tooling/activemq-junit/pom.xml index 5fac700c003..ed6200aa63c 100644 --- a/activemq-tooling/activemq-junit/pom.xml +++ b/activemq-tooling/activemq-junit/pom.xml @@ -21,7 +21,7 @@ org.apache.activemq.tooling activemq-tooling - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-junit diff --git a/activemq-tooling/activemq-maven-plugin/pom.xml b/activemq-tooling/activemq-maven-plugin/pom.xml index bcab8e244a5..37a7939f4a6 100644 --- a/activemq-tooling/activemq-maven-plugin/pom.xml +++ b/activemq-tooling/activemq-maven-plugin/pom.xml @@ -21,7 +21,7 @@ org.apache.activemq.tooling activemq-tooling - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-maven-plugin diff --git a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml index be109b28b7d..a2780d9b1f0 100644 --- a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml +++ b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq.tooling activemq-tooling - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-memtest-maven-plugin diff --git a/activemq-tooling/activemq-perf-maven-plugin/pom.xml b/activemq-tooling/activemq-perf-maven-plugin/pom.xml index 8dc5166faf4..a5fbab64ce3 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/pom.xml +++ b/activemq-tooling/activemq-perf-maven-plugin/pom.xml @@ -21,7 +21,7 @@ org.apache.activemq.tooling activemq-tooling - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-perf-maven-plugin diff --git a/activemq-tooling/pom.xml b/activemq-tooling/pom.xml index b0f699920f1..0aea2f8a93e 100644 --- a/activemq-tooling/pom.xml +++ b/activemq-tooling/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT org.apache.activemq.tooling diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 1265166627e..aa447ec816a 100644 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-unit-tests diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 3dab66916f5..b5f31eede6b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -2086,6 +2086,14 @@ protected void testAddTransportConnectorBlockedBrokerView(String scheme) throws assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); } + try { + // verify any composite URI is blocked as well without parens + brokerView.addConnector("static:tcp://0.0.0.0:0," + scheme + "://" + brokerName); + fail("Should have failed trying to add connector with scheme: " + scheme); + } catch (IllegalArgumentException e) { + assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); + } + try { // verify nested composite URI is blocked brokerView.addConnector("static:(static:(static:(" + scheme + "://localhost)))"); @@ -2108,6 +2116,15 @@ public void testNestedAddTransportConnector() throws Exception { } catch (IllegalArgumentException e) { assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); } + + try { + // verify nested composite URI with more than 5 levels is blocked without parens + brokerView.addConnector( + "static:static:static:static:static:static:tcp://localhost:0"); + fail("Should have failed trying to add vm connector bridge"); + } catch (IllegalArgumentException e) { + assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); + } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java index 936603c3b82..68139efe5cc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java @@ -16,19 +16,34 @@ */ package org.apache.activemq.config; -import java.io.File; -import java.util.Hashtable; - -import javax.naming.Context; -import javax.naming.InitialContext; +import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.io.File; +import java.util.Hashtable; + /** * */ public class BrokerXmlConfigFromJNDITest extends JmsTopicSendReceiveWithTwoConnectionsTest { + + @Override + protected void setUp() throws Exception { + // reset before each test + resetVmTransportFactory("xbean"); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + resetVmTransportFactory(); + } + @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { assertBaseDirectoryContainsSpaces(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java index 3d3b8c1cdc5..b20518e884c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java @@ -82,6 +82,31 @@ public void testBridgeRegistration() throws Exception { assertEquals("NC", nc.getName()); } + @Test + public void testTransportSchemeBridgeAllowed() throws Exception { + // Test composite network connector uri + String name = proxy.addNetworkConnector("static:(tcp://localhost,amqp://localhost)"); + proxy.removeNetworkConnector(name); + + // Test composite with missing parens + name = proxy.addNetworkConnector("static:amqp://localhost,tcp://127.0.0.1:0"); + proxy.removeNetworkConnector(name); + + // verify direct connector as well + name = proxy.addNetworkConnector("static:stomp://localhost"); + proxy.removeNetworkConnector(name); + + // verify nested composite URI + name = proxy.addNetworkConnector( + "static:(static:(static:(tcp+ssl://localhost:0,auto+nio+ssl://localhost)))"); + proxy.removeNetworkConnector(name); + + // verify nested composite URI is not blocked when not using parens + name = proxy.addNetworkConnector( + "static:static:static:123://localhost:0,auto+nio+ssl://localhost"); + proxy.removeNetworkConnector(name); + } + @Test public void testTransportSchemeBridgeBlocked() throws Exception { for (String deniedScheme : DENIED_TRANSPORT_SCHEMES) { @@ -99,6 +124,14 @@ protected void testTransportSchemeBridgeBlocked(String scheme) throws Exception assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); } + // Test composite with missing parens + try { + proxy.addNetworkConnector("static:" + scheme + "://localhost,tcp://127.0.0.1:0"); + fail("Should have failed trying to add connector bridge with scheme: " + scheme); + } catch (IllegalArgumentException e) { + assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); + } + // verify direct connector as well try { proxy.addNetworkConnector(scheme + "://localhost"); @@ -114,6 +147,14 @@ protected void testTransportSchemeBridgeBlocked(String scheme) throws Exception } catch (IllegalArgumentException e) { assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); } + + try { + // verify nested composite URI is blocked when not using parens + proxy.addNetworkConnector("static:static:static:tcp://localhost:0," + scheme + "://localhost"); + fail("Should have failed trying to add connector bridge with scheme: " + scheme); + } catch (IllegalArgumentException e) { + assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); + } } @Test @@ -124,12 +165,21 @@ public void testAddNetworkConnectorMaxComposite() throws Exception { try { // verify nested composite URI with more than 5 levels is blocked. This has 6 nested - // (not including first wrapper url + // (not including first wrapper url) proxy.addNetworkConnector( "static:(static:(static:(static:(static:(static:(tcp://localhost:0))))))"); fail("Should have failed trying to add more than 5 connector bridges"); } catch (IllegalArgumentException e) { assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); } + + try { + // verify nested composite URI with more than 5 levels is blocked without parens + proxy.addNetworkConnector( + "static:static:static:static:static:static:tcp://localhost:0"); + fail("Should have failed trying to add more than 5 connector bridges"); + } catch (IllegalArgumentException e) { + assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); + } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java new file mode 100644 index 00000000000..47df7ef8baa --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractDurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger( + AbstractDurableSyncNetworkBridgeTest.class); + + protected abstract void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, File dataDir) throws Exception; + + protected abstract void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception; + + protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { + stopLocalBroker(); + doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); + } + + protected void restartRemoteBroker() throws Exception { + final int previousPort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); + final File dataDir = remoteBroker.getDataDirectoryFile(); + stopRemoteBroker(); + try { + doSetUpRemoteBroker(false, dataDir, previousPort); + } catch (final IOException e) { + if (e.getCause() instanceof java.net.BindException) { + // Previous port still in TIME_WAIT — use a new ephemeral port + doSetUpRemoteBroker(false, dataDir, 0); + // Update the local broker's network connector to point to the new port + updateLocalNetworkConnectorUri(); + } else { + throw e; + } + } + } + + protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { + if (broker.getBrokerName().equals("localBroker")) { + restartLocalBroker(startNetworkConnector); + } else { + restartRemoteBroker(); + } + } + + protected void waitForBridgeFullyStarted() throws Exception { + waitForBridgeFullyStarted(TimeUnit.SECONDS.toMillis(15), true); + } + + protected void waitForBridgeFullyStarted(long millis, boolean duplex) throws Exception { + // Wait for the local bridge to be fully started (advisory consumers registered) + assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { + if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { + return false; + } + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, millis, 100)); + + // Also wait for the duplex bridge on the remote broker to be fully started. + // The duplex connector creates a separate DemandForwardingBridge on the remote side + // that also needs its advisory consumers registered before it can process events. + if (duplex) { + assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { + final DemandForwardingBridge duplexBridge = findDuplexBridge( + remoteBroker.getTransportConnectors().get(0)); + return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; + }, millis, 100)); + } + } + + + /** + * When the remote broker restarts on a new ephemeral port (BindException fallback), + * any existing network connector on the local broker still points to the old port. + * This method stops the old connector and replaces it with one targeting the new URI. + */ + protected void updateLocalNetworkConnectorUri() throws Exception { + if (localBroker == null) { + return; + } + final List connectors = localBroker.getNetworkConnectors(); + if (connectors.isEmpty()) { + return; + } + final NetworkConnector oldConnector = connectors.get(0); + oldConnector.stop(); + localBroker.removeNetworkConnector(oldConnector); + final NetworkConnector newConnector = configureLocalNetworkConnector(); + localBroker.addNetworkConnector(newConnector); + newConnector.start(); + } + + protected abstract NetworkConnector configureLocalNetworkConnector() throws Exception; + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java new file mode 100644 index 00000000000..646dd4f184d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerSubscriptionInfo; +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class DurableSyncNetworkBridgeAuthTest extends AbstractDurableSyncNetworkBridgeTest { + + protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeAuthTest.class); + + @Parameters(name="duplex={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {true}, + {false} + }); + } + + @Rule + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + + static { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + } + + private static final String USER_PASSWORD = "password"; + private final boolean duplex; + private final AtomicReference brokerSubInfo = new AtomicReference<>(); + private String ncPassword = USER_PASSWORD; + + public DurableSyncNetworkBridgeAuthTest(boolean duplex) { + this.duplex = duplex; + } + + @Before + public void setUp() throws Exception { + this.ncPassword = USER_PASSWORD; + this.brokerSubInfo.set(null); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + @Test + public void testAuthSuccess() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Simulate a connection exception and reconnect, we should receive again + brokerSubInfo.set(null); + localBroker.getNetworkConnectors().get(0).activeBridges().stream() + .findFirst().orElseThrow().serviceRemoteException(new Exception()); + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + @Test + public void testAuthFailure() throws Exception { + this.ncPassword = "badpassword"; + try { + // set a shorter wait time, it won't connect with bad password + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(5)); + throw new IllegalStateException("Should have received assertion error with bad password"); + } catch (AssertionError e) { + // expected + } + + // Because the local broker was not authenticated by the remote broker, the local broker + // should not have received back the BrokerSubscriptionInfo + assertNull(brokerSubInfo.get()); + } + + @Test + public void testRestartSync() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Restart, should receive again with new connection + brokerSubInfo.set(null); + restartRemoteBroker(); + + // Wait for the reconnect and receive of BrokerSubInfo + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, + File remoteDataDir, long waitForStart) throws Exception { + doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); + doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); + //Wait for the bridge to be fully started + if (startNetworkConnector) { + waitForBridgeFullyStarted(waitForStart, duplex); + } + } + + protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, + File dataDir) throws Exception { + localBroker = createLocalBroker(dataDir, startNetworkConnector); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + + if (startNetworkConnector) { + // Best-effort wait for the bridge to appear. Do NOT use assertTrue here + // because some tests restart localBroker before remoteBroker is running, + // relying on the bridge connecting later when remoteBroker restarts. + // Tests that need the bridge to be fully started call assertBridgeStarted() explicitly. + // Keep timeout short (5s) to avoid growing the NC reconnect backoff too much, + // which would delay bridge formation when the remote broker starts later. + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 500); + } + + } + + protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception { + remoteBroker = createRemoteBroker(dataDir, port); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("localBroker"); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + if (startNetworkConnector) { + brokerService.addNetworkConnector(configureLocalNetworkConnector()); + } + + //Use auto+nio+ssl to test out the transport works with bridging + brokerService.addConnector("auto+nio+ssl://localhost:0"); + + return brokerService; + } + + @Override + protected NetworkConnector configureLocalNetworkConnector() throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)) { + @Override + protected NetworkBridge createBridge(Transport localTransport, + Transport remoteTransport, DiscoveryEvent event) { + // Add a listener so we can capture if the remote broker sends + // back a BrokerSubscriptionInfo object + final Transport remoteFilter = new TransportFilter(remoteTransport) { + @Override + public void onCommand(Object command) { + if (command instanceof BrokerSubscriptionInfo) { + if (brokerSubInfo.get() != null) { + throw new IllegalStateException("Received BrokerSubscriptionInfo more than once."); + } + brokerSubInfo.set((BrokerSubscriptionInfo) command); + } + super.onCommand(command); + } + }; + return super.createBridge(localTransport, remoteFilter, event); + } + }; + connector.setName("networkConnector"); + connector.setUserName("user1"); + connector.setPassword(ncPassword); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(duplex); + connector.setStaticBridge(false); + connector.setSyncDurableSubs(true); + connector.setDynamicallyIncludedDestinations(List.of(new ActiveMQTopic("include.test.>"))); + return connector; + } + + protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + // Add authentication to the remote broker + AuthenticationUser user = new AuthenticationUser("user1", USER_PASSWORD, "group1"); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(); + authenticationPlugin.setUsers(List.of(user)); + brokerService.setPlugins(new BrokerPlugin[] {authenticationPlugin}); + + brokerService.addConnector("auto+nio+ssl://localhost:" + port); + + return brokerService; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index aa36e26fba9..ede1593fd17 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -16,22 +16,10 @@ */ package org.apache.activemq.network; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; - import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisoryBroker; import org.apache.activemq.broker.BrokerPlugin; @@ -62,8 +50,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + @RunWith(Parameterized.class) -public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { +public class DurableSyncNetworkBridgeTest extends AbstractDurableSyncNetworkBridgeTest { protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class); @@ -83,7 +83,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { private final FLOW flow; @Rule - public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS); + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); @Parameters public static Collection data() { @@ -138,6 +138,10 @@ public void testRemoveSubscriptionPropagate() throws Exception { assertSubscriptionsCount(broker1, topic, 1); assertNCDurableSubsCount(broker2, topic, 1); + // Wait for subscription to become inactive before attempting removal + // It's very important to wait here, otherwise the removal may not propagate + waitForSubscriptionInactive(broker1, topic, subName); + removeSubscription(broker1, subName); assertSubscriptionsCount(broker1, topic, 0); @@ -222,7 +226,12 @@ public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exce //Test that on successful reconnection of the bridge that //the NC sub will be removed restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); + // In REVERSE flow, broker2=localBroker has the bridge and broker1 (remoteBroker) + // is already running, so the sync may have already cleaned up the NC durable sub. + // This "before sync" assertion is only valid in FORWARD flow. + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + } restartBroker(broker1, true); assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); @@ -249,7 +258,9 @@ public void testSubscriptionRemovedAfterIncludedChanged() throws Exception { //the NC sub will be removed because even though the local subscription exists, //it no longer matches the included filter restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + } restartBroker(broker1, true); assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); @@ -287,7 +298,9 @@ public void testSubscriptionRemovedAfterStaticChanged() throws Exception { //the NC sub will be removed because even though the local subscription exists, //it no longer matches the included static filter restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + } restartBroker(broker1, true); assertBridgeStarted(); assertNCDurableSubsCount(broker2, topic, 0); @@ -316,10 +329,13 @@ public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Ex //Test that on successful reconnection of the bridge that //the NC sub will be removed for topic1 but will stay for topic2 - //before sync, the old NC should exist + //before sync, the old NC should exist (only verifiable in FORWARD flow; + //in REVERSE, broker2=localBroker has the bridge and sync may already have run) restartBroker(broker2, true); - assertNCDurableSubsCount(broker2, topic, 1); - assertNCDurableSubsCount(broker2, topic2, 0); + if (flow == FLOW.FORWARD) { + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic2, 0); + } //After sync, remove old NC and create one for topic 2 restartBroker(broker1, true); @@ -527,7 +543,6 @@ public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception { session1.createDurableSubscriber(topic, "sub3"); session1.createDurableSubscriber(excludeTopic, "sub-exclude"); - Thread.sleep(1000); assertNCDurableSubsCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, excludeTopic, 0); @@ -566,13 +581,10 @@ public void testAddOnlineSubscriptionsTwoBridges() throws Exception { secondConnector.start(); //Make sure both bridges are connected - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && - localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1; - } - }, 10000, 500)); + assertTrue(Wait.waitFor(() -> + localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && + localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(15), 500)); //Make sure NC durables exist for both bridges assertNCDurableSubsCount(broker2, topic2, 1); @@ -633,13 +645,7 @@ public void testVirtualDestSubForceDurableSync() throws Exception { final DestinationStatistics remoteDestStatistics2 = remoteBroker.getDestination( new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); - assertTrue(Wait.waitFor(new Condition() { - - @Override - public boolean isSatisified() throws Exception { - return remoteDestStatistics2.getMessages().getCount() == 501; - } - })); + assertTrue(Wait.waitFor(() -> remoteDestStatistics2.getMessages().getCount() == 501)); } @@ -700,14 +706,6 @@ protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination.. return compositeTopic; } - protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { - if (broker.getBrokerName().equals("localBroker")) { - restartLocalBroker(startNetworkConnector); - } else { - restartRemoteBroker(); - } - } - protected void restartBrokers(boolean startNetworkConnector) throws Exception { doTearDown(); doSetUp(false, startNetworkConnector, localBroker.getDataDirectoryFile(), @@ -719,23 +717,13 @@ protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, included = new ActiveMQTopic(testTopicName); doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); - //Give time for advisories to propagate - Thread.sleep(1000); - } - - protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { - stopLocalBroker(); - doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); - } - - protected void restartRemoteBroker() throws Exception { - int port = 0; - if (remoteBroker != null) { - List transportConnectors = remoteBroker.getTransportConnectors(); - port = transportConnectors.get(0).getConnectUri().getPort(); + //Wait for the bridge to be fully started (advisory consumers registered). + //Note: activeBridges().size() == 1 is NOT sufficient because bridges are added + //to the map before start() completes asynchronously. We must wait for the + //startedLatch which counts down after advisory consumers are registered. + if (startNetworkConnector) { + waitForBridgeFullyStarted(); } - stopRemoteBroker(); - doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port); } protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, @@ -753,12 +741,14 @@ protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetwor localConnection.start(); if (startNetworkConnector) { - Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; - } - }, 5000, 500); + // Best-effort wait for the bridge to appear. Do NOT use assertTrue here + // because some tests restart localBroker before remoteBroker is running, + // relying on the bridge connecting later when remoteBroker restarts. + // Tests that need the bridge to be fully started call assertBridgeStarted() explicitly. + // Keep timeout short (5s) to avoid growing the NC reconnect backoff too much, + // which would delay bridge formation when the remote broker starts later. + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 500); } localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -817,6 +807,7 @@ protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConn return brokerService; } + @Override protected NetworkConnector configureLocalNetworkConnector() throws Exception { List transportConnectors = remoteBroker.getTransportConnectors(); URI remoteURI = transportConnectors.get(0).getConnectUri(); @@ -869,4 +860,24 @@ protected BrokerService createRemoteBroker(File dataDir, int port) throws Except return brokerService; } + /** + * Wait for a durable subscription to become inactive before attempting removal. + * This prevents "Durable consumer is in use" errors when consumer close operations + * complete asynchronously (especially visible with Java 25's different thread scheduling). + */ + protected void waitForSubscriptionInactive(final BrokerService brokerService, + final ActiveMQTopic topic, + final String subName) throws Exception { + assertTrue("Subscription should become inactive", Wait.waitFor(() -> { + final List subs = getSubscriptions(brokerService, topic); + for (final org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) { + if (sub.getSubscriptionKey().getSubscriptionName().equals(subName)) { + return !sub.isActive(); + } + } + // If subscription doesn't exist, it's considered inactive + return true; + }, TimeUnit.SECONDS.toMillis(15), 100)); + } + } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java index 2d83fb71b70..4e0ecf7480e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java @@ -24,6 +24,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.JMSException; @@ -45,7 +46,6 @@ import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.Wait; -import org.apache.activemq.util.Wait.Condition; import org.junit.Rule; import org.junit.rules.TemporaryFolder; @@ -95,9 +95,16 @@ protected void stopRemoteBroker() throws Exception { } protected void assertBridgeStarted() throws Exception { - assertTrue(Wait.waitFor( - () -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, - 10000, 500)); + assertTrue("Bridge should be fully started", Wait.waitFor(() -> { + if (localBroker.getNetworkConnectors().get(0).activeBridges().size() != 1) { + return false; + } + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, TimeUnit.SECONDS.toMillis(10), 500)); } protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context, @@ -119,8 +126,8 @@ protected void waitForConsumerCount(final DestinationStatistics destinationStati protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception { assertTrue(Wait.waitFor(() -> count == destinationStatistics.getDequeues().getCount() && - count == destinationStatistics.getDispatched().getCount() && - count == destinationStatistics.getForwards().getCount())); + count == destinationStatistics.getDispatched().getCount() && + count == destinationStatistics.getForwards().getCount())); } protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) { @@ -135,17 +142,19 @@ protected interface ConsumerCreator { protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, final int count) throws Exception { - assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(), - 10000, 500)); + assertTrue("Expected " + count + " NC durable subs on " + dest, + Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(), + TimeUnit.SECONDS.toMillis(30), 500)); } protected void assertConsumersCount(final BrokerService brokerService, final ActiveMQDestination dest, final int count) throws Exception { assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(), - 10000, 500)); - Thread.sleep(1000); - // Check one more time after a short pause to make sure the count didn't increase past what we wanted - assertEquals(count, getConsumers(brokerService, dest).size()); + 10000, 500)); + // Wait a bit longer and verify the count is stable (didn't increase past what we wanted) + assertTrue("Consumer count should remain stable at " + count, + Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(), + TimeUnit.SECONDS.toMillis(5), 500)); } protected List getConsumers(final BrokerService brokerService, @@ -194,7 +203,7 @@ protected List getNCDurableSubs(final BrokerService br } protected void removeSubscription(final BrokerService brokerService, - final String subName) throws Exception { + final String subName) throws Exception { final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); info.setClientId(clientId); info.setSubscriptionName(subName); @@ -208,12 +217,9 @@ protected void removeSubscription(final BrokerService brokerService, protected void assertSubscriptionsCount(final BrokerService brokerService, final ActiveMQTopic dest, final int count) throws Exception { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == getSubscriptions(brokerService, dest).size(); - } - }, 10000, 500)); + assertTrue("Expected " + count + " subscriptions on " + dest, + Wait.waitFor(() -> count == getSubscriptions(brokerService, dest).size(), + TimeUnit.SECONDS.toMillis(30), 500)); } protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final int count) { @@ -226,8 +232,9 @@ protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final in protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception { assertNotNull(connector); - for (TransportConnection tc : connector.getConnections()) { - if (tc.getConnectionId().startsWith("networkConnector_")) { + for (final TransportConnection tc : connector.getConnections()) { + final String connectionId = tc.getConnectionId(); + if (connectionId != null && connectionId.startsWith("networkConnector_")) { final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge"); bridgeField.setAccessible(true); return (DemandForwardingBridge) bridgeField.get(tc); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java new file mode 100644 index 00000000000..f0904af1918 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.security; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.filter.DestinationMap; +import org.apache.activemq.jaas.GroupPrincipal; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.Principal; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +@RunWith(value = Parameterized.class) +public class DestinationAdminAuthzTest { + + private static final Logger LOG = LoggerFactory.getLogger(DestinationAdminAuthzTest.class); + + public static final String APP1_USER_NAME = "app1"; + public static final String APP2_USER_NAME = "app2"; + public static final String APP3_USER_NAME = "app3"; + + public static final String APP1_GROUP_NAME = "app1Group"; + public static final String APP2_GROUP_NAME = "app2Group"; + public static final String APP3_GROUP_NAME = "app3Group"; + + public static final GroupPrincipal ADMINS = new GroupPrincipal("admins"); + public static final GroupPrincipal APP1GROUP = new GroupPrincipal(APP1_GROUP_NAME); + public static final GroupPrincipal APP2GROUP = new GroupPrincipal(APP2_GROUP_NAME); + public static final GroupPrincipal APP3GROUP = new GroupPrincipal(APP3_GROUP_NAME); + + public static Principal WILDCARD; + static { + try { + WILDCARD = (Principal) DefaultAuthorizationMap.createGroupPrincipal( + "*", GroupPrincipal.class.getName()); + } catch (Exception e) { + LOG.error("Failed to make wildcard principal", e); + } + } + + private BrokerService brokerService = null; + private String destinationType = null; + private String userName = null; + private boolean errorExpected = true; + + @Parameterized.Parameters(name="type={0}, user={1}, error={2}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"queue", APP1_USER_NAME, false}, + {"topic", APP1_USER_NAME, false}, + {"temp-queue", APP1_USER_NAME, false}, + {"temp-topic", APP1_USER_NAME, false}, + {"queue", APP2_USER_NAME, false}, + {"topic", APP2_USER_NAME, false}, + {"temp-queue", APP2_USER_NAME, false}, + {"temp-topic", APP2_USER_NAME, false}, + {"queue", APP3_USER_NAME, true}, + {"topic", APP3_USER_NAME, true}, + {"temp-queue", APP3_USER_NAME, true}, + {"temp-topic", APP3_USER_NAME, true} + }); + } + + public DestinationAdminAuthzTest(String destinationType, String userName, boolean errorExpected) { + this.destinationType = destinationType; + this.userName = userName; + this.errorExpected = errorExpected; + } + + @Before + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.start(); + brokerService.waitUntilStarted(5_000L); + } + + @After + public void tearDown() throws Exception { + if (brokerService != null && brokerService.isStarted()) { + LOG.info("Shutting down broker:{}", brokerService.getBrokerName()); + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + + @Test + public void testDestinationPermissions() throws Exception { + boolean exceptionOccured = false; + + var connectionFactory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); + connectionFactory.setWatchTopicAdvisories(false); + + final String destName = userName + ".in";; + ActiveMQDestination tmpActiveMQDestination = null; + switch (destinationType) { + case "temp-queue": tmpActiveMQDestination = new ActiveMQTempQueue(); break; + case "temp-topic": tmpActiveMQDestination = new ActiveMQTempTopic(); break; + case "queue": tmpActiveMQDestination = new ActiveMQQueue(destName); break; + case "topic": tmpActiveMQDestination = new ActiveMQTopic(destName); break; + default: throw new IllegalArgumentException("Unsupported destinationType: " + destinationType); + }; + var activemqDestination = tmpActiveMQDestination; + + try (final var connection = connectionFactory.createConnection(userName, "password"); + final var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + connection.start(); + + Destination tmpDestination = null; + switch (destinationType) { + case "temp-queue": tmpDestination = session.createTemporaryQueue(); break; + case "temp-topic": tmpDestination = session.createTemporaryTopic(); break; + case "queue": tmpDestination = session.createQueue(destName); break; + case "topic": tmpDestination = session.createTopic(destName); break; + default: throw new IllegalArgumentException("Unsupported destinationType: " + destinationType); + }; + final var destination = tmpDestination; + + if(activemqDestination.isTemporary()) { + switch (destinationType) { + case "temp-queue": activemqDestination.setPhysicalName(destination.toString().substring("temp-queue://".length())); break; + case "temp-topic": activemqDestination.setPhysicalName(destination.toString().substring("temp-topic://".length())); break; + } + } + + var brokerDestination = brokerService.getDestination(activemqDestination); + + try (var messageProducer = session.createProducer(destination); + var messageConsumer = session.createConsumer(destination)) { + + messageProducer.send(session.createTextMessage("Test message")); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + return brokerService.getDestination(activemqDestination) != null; + } catch (Exception e) { + return false; + } + } + }, 5_000L, 50L); + + var message = messageConsumer.receive(2_000L); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1L == brokerDestination.getDestinationStatistics().getDequeues().getCount(); + } + }, 5_000L, 50L); + + assertNotNull(message); + assertTrue(message.propertyExists("JMSXUserID")); + assertEquals(userName, message.getStringProperty("JMSXUserID")); + assertEquals("Test message", message.getBody(String.class)); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0L == brokerDestination.getDestinationStatistics().getConsumers().getCount(); + } + }, 5_000L, 50L); + + if (!activemqDestination.isTemporary()) { + ((ActiveMQConnection) connection).destroyDestination(activemqDestination); + } + + // Check removing a destination that already exists, but does not have authz + boolean removeExceptionOccured = false; + try { + for (var otherActiveMQDestination : Set.of(new ActiveMQQueue("other.in"), new ActiveMQTopic("other.in"))) { + ((ActiveMQConnection) connection).destroyDestination(otherActiveMQDestination); + } + } catch (JMSException e) { + removeExceptionOccured = true; + } + assertTrue(removeExceptionOccured); + + // Check removing a destination that does not exist does not throw an error + boolean notExistExceptionOccured = false; + try { + for (var noexistActiveMQDestination : Set.of(new ActiveMQQueue("noexist.in"), new ActiveMQTopic("noexist.in"))) { + ((ActiveMQConnection) connection).destroyDestination(noexistActiveMQDestination); + } + } catch (JMSException e) { + notExistExceptionOccured = true; + } + assertFalse(notExistExceptionOccured); + + } catch (JMSException e) { + exceptionOccured = true; + } + + assertEquals(errorExpected, exceptionOccured); + } + + public static AuthorizationMap createAuthorizationMap() { + DestinationMap readAccess = new DefaultAuthorizationMap(); + readAccess.put(new ActiveMQQueue(">"), ADMINS); + readAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP); + readAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP); + readAccess.put(new ActiveMQTopic(">"), ADMINS); + readAccess.put(new ActiveMQTopic("app1.>"), APP1GROUP); + readAccess.put(new ActiveMQTopic("app2.>"), APP2GROUP); + + // Note: APP3 intentionally has no acccess + // readAccess.put(new ActiveMQQueue("app3.>"), APP3); + // readAccess.put(new ActiveMQTopic("app3.>"), APP3); + + DestinationMap writeAccess = new DefaultAuthorizationMap(); + writeAccess.put(new ActiveMQQueue(">"), ADMINS); + writeAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP); + writeAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP); + writeAccess.put(new ActiveMQTopic(">"), ADMINS); + writeAccess.put(new ActiveMQTopic("app1.>"), APP1GROUP); + writeAccess.put(new ActiveMQTopic("app2.>"), APP2GROUP); + + DestinationMap adminAccess = new DefaultAuthorizationMap(); + adminAccess.put(new ActiveMQTopic(">"), ADMINS); + adminAccess.put(new ActiveMQTopic("app1.>"), APP1GROUP); + adminAccess.put(new ActiveMQTopic("app2.>"), APP2GROUP); + adminAccess.put(new ActiveMQQueue(">"), ADMINS); + adminAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP); + adminAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP); + + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + + var authorizationMap = new SimpleAuthorizationMap(writeAccess, readAccess, adminAccess); + var tempDestinationAuthorizationEntry = new TempDestinationAuthorizationEntry(); + + try { + tempDestinationAuthorizationEntry.setAdmin("admins, app1Group, app2Group"); + tempDestinationAuthorizationEntry.setRead("admins, app1Group, app2Group"); + tempDestinationAuthorizationEntry.setWrite("admins, app1Group, app2Group"); + } catch (Exception e) { + fail(e.getMessage()); + } + + authorizationMap.setTempDestinationAuthorizationEntry(tempDestinationAuthorizationEntry); + return authorizationMap; + } + + public static class SimpleAuthenticationBrokerPlugin implements BrokerPlugin { + + private final Map userPasswords; + private final Map> userGroups; + + public SimpleAuthenticationBrokerPlugin(final Map userPasswords, final Map> userGroups) { + this.userPasswords = userPasswords; + this.userGroups = userGroups; + } + + public Broker installPlugin(Broker broker) { + return new SimpleAuthenticationBroker(broker, userPasswords, userGroups); + } + + public String toString() { + return "SimpleAuthenticationBroker"; + } + } + + protected BrokerService createBroker() throws Exception { + var brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setDestinations(new ActiveMQDestination[] { new ActiveMQQueue("other.in"), new ActiveMQTopic("other.in") }); + brokerService.setPersistent(false); + brokerService.setPlugins(new BrokerPlugin[] {new AuthorizationPlugin(createAuthorizationMap()), new SimpleAuthenticationBrokerPlugin(createUserPasswords(), createUserGroups())}); + brokerService.setPopulateJMSXUserID(true); + brokerService.setSchedulerSupport(false); + brokerService.setUseAuthenticatedPrincipalForJMSXUserID(true); + brokerService.setUseJmx(true); + return brokerService; + } + + private static Map createUserPasswords() { + var users = new HashMap(); + users.put("admin", "password"); + users.put(APP1_USER_NAME, "password"); + users.put(APP2_USER_NAME, "password"); + users.put(APP3_USER_NAME, "password"); + return users; + } + + private static Map> createUserGroups() { + var groups = new HashMap>(); + groups.put("admin", new HashSet(Arrays.asList(new Principal[] {new GroupPrincipal("admins")}))); + groups.put(APP1_USER_NAME, new HashSet(Arrays.asList(new Principal[] { new GroupPrincipal(APP1_GROUP_NAME) }))); + groups.put(APP2_USER_NAME, new HashSet(Arrays.asList(new Principal[] { new GroupPrincipal(APP2_GROUP_NAME) }))); + groups.put(APP3_USER_NAME, new HashSet(Arrays.asList(new Principal[] { new GroupPrincipal(APP3_GROUP_NAME) }))); + return groups; + } +} + diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java index 4e71e914f6b..88045c2c218 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.spring; +import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory; import static org.apache.activemq.xbean.XBeanBrokerFactory.DEFAULT_ALLOWED_PROTOCOLS; import static org.apache.activemq.xbean.XBeanBrokerFactory.XBEAN_BROKER_FACTORY_PROTOCOLS_PROP; import static org.junit.Assert.assertNotNull; @@ -28,14 +29,22 @@ import java.net.UnknownHostException; import java.nio.file.NoSuchFileException; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.transport.vm.VMTransportFactory; import org.apache.activemq.xbean.XBeanBrokerFactory; import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.springframework.beans.FatalBeanException; public class ActiveMQConnectionFactoryXBeanTest { + @BeforeClass + public static void beforeClass() throws Exception { + // enable xbean for the vm transport factory + resetVmTransportFactory("xbean"); + } + @Before public void setUp() throws Exception { // reset before each test @@ -45,6 +54,8 @@ public void setUp() throws Exception { @AfterClass public static void tearDown() throws Exception { System.setProperty(XBEAN_BROKER_FACTORY_PROTOCOLS_PROP, DEFAULT_ALLOWED_PROTOCOLS); + // reset defaults + resetVmTransportFactory(); } // File and classpath are allowed by default diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportFactoryTest.java new file mode 100644 index 00000000000..fa24ea432fd --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportFactoryTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.vm; + +import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import javax.jms.Connection; +import javax.jms.JMSException; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +public class VMTransportFactoryTest { + + @Before + public void setUp() throws Exception { + // reset before each test + resetVmTransportFactory(); + } + + @AfterClass + public static void tearDown() throws Exception { + resetVmTransportFactory(); + } + + @Test + public void testDefaults() throws Exception { + // broker and properties allowed by default + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + + // xbean not allowed by default + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testAllowAll() throws Exception { + resetVmTransportFactory("broker,properties,xbean"); + + // broker and properties allowed by default + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + + // xbean now allowed + assertBrokerCreated("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testAllowAllWildcard() throws Exception { + resetVmTransportFactory("*"); + + // all allowed + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + assertBrokerCreated("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testNullSchemes() throws Exception { + // should set to defaults + resetVmTransportFactory(null); + + // broker and properties allowed by default + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + + // xbean not allowed by default + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testNoneAllowed() throws Exception { + // deny all + resetVmTransportFactory(""); + + // nothing allowed + assertBrokerStartError("vm:localhost?persistent=false"); + assertBrokerStartError("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerStartError("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testOneAllowed() throws Exception { + // deny all + resetVmTransportFactory("properties"); + + // only properties allowed + assertBrokerStartError("vm:localhost?persistent=false"); + assertBrokerStartError("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + private void assertBrokerCreated(String url) throws JMSException { + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + } + } + + private void assertBrokerStartError(String url) { + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + try (Connection ignored = factory.createConnection()) { + fail("Should have failed with an exception"); + } catch (JMSException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + assertTrue(cause instanceof IllegalArgumentException); + } + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/VmTransportTestUtils.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/VmTransportTestUtils.java new file mode 100644 index 00000000000..7a4d38aee8f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/util/VmTransportTestUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.lang.reflect.Field; +import java.util.concurrent.ConcurrentMap; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.vm.VMTransportFactory; + +public class VmTransportTestUtils { + + public static void resetVmTransportFactory() throws Exception { + resetVmTransportFactory(VMTransportFactory.DEFAULT_ALLOWED_SCHEMES); + } + + @SuppressWarnings("unchecked") + public static void resetVmTransportFactory(String allowedSchemes) throws Exception { + if (allowedSchemes == null) { + System.clearProperty(VMTransportFactory.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP); + } else { + // set property to allowed schemes + System.setProperty(VMTransportFactory.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP, + allowedSchemes); + } + + // clear any cached factory so the next call will create a new transport and use + // the correct property setting + Field factoriesField = TransportFactory.class.getDeclaredField("TRANSPORT_FACTORYS"); + factoriesField.setAccessible(true); + ConcurrentMap factories = + (ConcurrentMap) factoriesField.get(TransportFactory.class); + factories.remove("vm"); + } +} diff --git a/activemq-web-console/pom.xml b/activemq-web-console/pom.xml index 5f61b1307d0..e5d65c03724 100644 --- a/activemq-web-console/pom.xml +++ b/activemq-web-console/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-web-console diff --git a/activemq-web-demo/pom.xml b/activemq-web-demo/pom.xml index 47ab97b8b74..2a4858da8d4 100644 --- a/activemq-web-demo/pom.xml +++ b/activemq-web-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-web-demo diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml index abb4a9167fc..4c8cfe61573 100644 --- a/activemq-web/pom.xml +++ b/activemq-web/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT activemq-web diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index b40b50f22f2..cbaf90b826b 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -53,7 +53,13 @@ * there will always be a chance of losing messages. Consider what happens when * a message is retrieved from the broker but the web call is interrupted before * the client receives the message in the response - the message is lost. + * + * @deprecated - WARNING: The MessageServlet should be used with caution as it is unmaintained + * and there are multiple security related issues. This servlet is primarily meant for demo + * purposes only and will be removed entirely in a future release. It is recommended to + * keep it disabled. */ +@Deprecated public class MessageServlet extends MessageServletSupport { // its a bit pita that this servlet got intermixed with asyncRequest/rest diff --git a/assembly/pom.xml b/assembly/pom.xml index 938e3debc2d..05fdc86ff55 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -22,7 +22,7 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT apache-activemq diff --git a/assembly/src/release/conf/activemq.xml b/assembly/src/release/conf/activemq.xml index 39ba30d7ae6..d5658cd7f49 100644 --- a/assembly/src/release/conf/activemq.xml +++ b/assembly/src/release/conf/activemq.xml @@ -54,6 +54,50 @@ + + + - + - - - - + + + + + diff --git a/assembly/src/release/conf/jetty.xml b/assembly/src/release/conf/jetty.xml index ee6436a6507..fd3fb72f527 100644 --- a/assembly/src/release/conf/jetty.xml +++ b/assembly/src/release/conf/jetty.xml @@ -22,6 +22,33 @@ + + + + @@ -49,6 +76,10 @@ + + + + @@ -78,12 +109,40 @@ + + + + + + + + + + + + + + + + + + + + + + @@ -125,6 +184,7 @@ + @@ -140,19 +200,94 @@ + + + + + + + + + + + + + + + + + + + + 127.0.0.1 + + + + + + + ::1 + + + + + + - - - - - - - - - - + @@ -203,7 +338,7 @@ + depends-on="broker, configureJetty, invokeConnectors, inetAccessIncludeLoopbackV6"> diff --git a/assembly/src/release/conf/jolokia-access.xml b/assembly/src/release/conf/jolokia-access.xml index 97b099a5b7f..6844c314608 100644 --- a/assembly/src/release/conf/jolokia-access.xml +++ b/assembly/src/release/conf/jolokia-access.xml @@ -17,7 +17,14 @@ --> - + + + post + + + @@ -46,11 +53,143 @@ + + + org.apache.activemq:type=Broker,brokerName=* + + terminateJVM + stop + stopGracefully + restart + gc + + addConnector + removeConnector + addNetworkConnector + removeNetworkConnector + addQueue + removeQueue + addTopic + removeTopic + createDurableSubscriber + destroyDurableSubscriber + + reloadLog4jProperties + setMemoryLimit + setStoreLimit + setTempLimit + setJobSchedulerStoreLimit + setMaxUncommittedCount + + + + + org.apache.activemq:type=Broker,brokerName=*,destinationType=*,destinationName=* + purge + removeMessage + removeMatchingMessages + copyMessageTo + copyMatchingMessagesTo + moveMessageTo + moveMatchingMessagesTo + retryMessage + retryMessages + removeMessageGroup + removeAllMessageGroups + sendTextMessage + sendTextMessageWithProperties + pause + resume + + + + + org.apache.activemq:type=Broker,brokerName=*,destinationType=*,destinationName=*,endpoint=Consumer,clientId=*,consumerId=* + destroy + removeMessage + setSelector + + + + + org.apache.activemq:type=Broker,brokerName=*,service=JobScheduler,name=* + removeJob + removeAllJobs + removeAllJobsAtScheduledTime + + + + + org.apache.activemq:type=Broker,brokerName=*,service=Log4JConfiguration + setRootLogLevel + setLogLevel + reloadLog4jProperties + + + + + org.apache.activemq:type=Broker,brokerName=*,connector=networkConnectors,networkConnectorName=* + Password + RemotePassword + setUserName + setPassword + setRemoteUserName + setRemotePassword + setBridgeTempDestinations + setConduitSubscriptions + setDispatchAsync + setDynamicOnly + setMessageTTL + setConsumerTTL + setPrefetchSize + setAdvisoryPrefetchSize + setDecreaseNetworkConsumerPriority + setSuppressDuplicateQueueSubscriptions + setSuppressDuplicateTopicSubscriptions + + + + + javax.management.loading:type=MLet + * + * + + + JMImplementation:* + * + * + org.apache.logging.log4j2:* * * + + java.util.logging:* + * + * + + + + java.lang:type=Memory + * + * + + + java.lang:type=ClassLoading + * + * + + + + java.lang:type=Runtime + SystemProperties + InputArguments + * + com.sun.management:type=DiagnosticCommand * diff --git a/assembly/src/release/webapps/api/WEB-INF/web.xml b/assembly/src/release/webapps/api/WEB-INF/web.xml index 3b134ac345f..d4689332498 100644 --- a/assembly/src/release/webapps/api/WEB-INF/web.xml +++ b/assembly/src/release/webapps/api/WEB-INF/web.xml @@ -22,11 +22,19 @@ Apache ActiveMQ REST API + - + jolokia-agent @@ -74,11 +88,6 @@ 1 - - MessageServlet - /message/* - - jolokia-agent /jolokia/* diff --git a/pom.xml b/pom.xml index 2f0980ec033..b2cb51c1d60 100644 --- a/pom.xml +++ b/pom.xml @@ -25,14 +25,14 @@ org.apache.activemq activemq-parent - 5.19.7-TT.2-SNAPSHOT + 5.19.8-TT.1-SNAPSHOT pom ActiveMQ 2005 - 2026-04-24T15:33:11Z + 2026-05-27T11:53:38Z 3.1.4 activemq-${project.version} @@ -87,7 +87,7 @@ 1.1.1 4.13.2 1.3 - 4.3.7 + 4.3.10 2.25.4 4.8.1 12.1.0 @@ -98,12 +98,12 @@ 3.4.14 0.34.1 1.9.0 - 4.1.94.Final + 4.1.133.Final 1.4 2.1.0 1.13.0-TT.2 2.0.17 - 1.1.2 + 1.1.10.8 5.3.39.RELEASE-TT.6 1.2.5 2.4.1