diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 74b1bc7541b..e95351a5be4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -162,6 +163,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); private String duplexNetworkConnectorId; private final long connectedTimestamp; + private final CompletableFuture initialConnectionId = new CompletableFuture<>(); /** * @param taskRunnerFactory - can be null if you want direct dispatch to the transport @@ -850,11 +852,16 @@ public Response processAddConnection(ConnectionInfo info) throws Exception { try { broker.addConnection(context, info); + // Complete the future with the connectionId if we completed + // the broker.addConnection() chain successfully + initialConnectionId.complete(info.getConnectionId()); } catch (Exception e) { synchronized (brokerConnectionStates) { brokerConnectionStates.remove(info.getConnectionId()); } unregisterConnectionState(info.getConnectionId()); + // complete with the exception + initialConnectionId.completeExceptionally(e); LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}", info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage()); //AMQ-6561 - stop for all exceptions on addConnection @@ -1389,13 +1396,10 @@ public Response processBrokerInfo(BrokerInfo info) { LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { try { - NetworkBridgeConfiguration config = getNetworkConfiguration(info); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(getNetworkConfiguration(info), info); } catch (Exception e) { - LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e); + LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e); return null; } } else if (info.isNetworkConnection() && info.isDuplexConnection()) { @@ -1405,10 +1409,8 @@ public Response processBrokerInfo(BrokerInfo info) { NetworkBridgeConfiguration config = getNetworkConfiguration(info); config.setBrokerName(broker.getBrokerName()); - if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { - LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); - dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config)); - } + // register durable sync to be sent after ConnectionInfo has been handled + registerDurableSync(config, info); // check for existing duplex connection hanging about @@ -1475,6 +1477,30 @@ public Response processBrokerInfo(BrokerInfo info) { return null; } + private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) { + if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) { + // this will complete when the connection id has been set, or immediately if already set + initialConnectionId.whenComplete((connectionId, t) -> { + try { + if (t != null) { + LOG.warn("SyncDurableSubs will be skipped due to error {}", + t.getMessage()); + return; + } + // check connection still registered + if (lookupConnectionState(connectionId) != null) { + LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo"); + dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo( + this.broker.getBrokerService(), config)); + } + } catch (Exception e) { + LOG.error("Failed to respond to network bridge creation from broker {}", + info.getBrokerId(), e); + } + }); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) private HashMap createMap(Properties properties) { return new HashMap(properties); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index 299d0751e3c..fa974572c75 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -559,26 +559,39 @@ private static void validateAllowedUrl(String uriString) throws URISyntaxExcepti // Validate the URI does not contain a denied transport scheme private static void validateAllowedUri(URI uri, int depth) throws URISyntaxException { // Don't allow more than 5 nested URIs to prevent blowing the stack - // If we are greater than 4 then this is the 5th level of composite - if (depth > 4) { + if (depth > 5) { throw new IllegalArgumentException("URI can't contain more than 5 nested composite URIs"); } // First check the main URI scheme validateAllowedScheme(uri.getScheme()); - // If composite, iterate and check each of the composite URIs - if (URISupport.isCompositeURI(uri)) { - URISupport.CompositeData data = URISupport.parseComposite(uri); + // We need to check if the URI is composite and/or contains nested URIs + // The utility method URISupport#isCompositeURI is not good enough here + // because it misses if there are no parentheses and also is primarily meant + // for checking comma separated URIs and not nested URIs. + // + // The best way to handle all cases is to use the same logic that the transports + // use to process the URIs and that is to simply attempt to parse it and check each + // of the parsed components. This wll correctly handle the case when there + // are parentheses and also when the parentheses are skipped. + final URISupport.CompositeData data; + try { + data = URISupport.parseComposite(uri); + } catch (URISyntaxException e) { + // If this is not a valid URI then we can stop checking + // This can happen when parsing a nested URI and at the last portion + return; + } + + if (data.getComponents() != null) { depth++; for (URI component : data.getComponents()) { - // Each URI could be a nested composite URI so call validateAllowedUri() - // to validate it. This check if composite first so we don't add to - // the recursive stack depth if there's a lot of URIs that are not composite - if (URISupport.isCompositeURI(component)) { + // Each URI could be a nested and/or composite URI so call validateAllowedUri() + // to validate it. If the scheme is null then the original URI is not composite + // or nested so we can skip the check, and we are finished. + if (component.getScheme() != null) { validateAllowedUri(component, depth); - } else { - validateAllowedScheme(component.getScheme()); } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java index e4f861a597a..958e4923735 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/security/AuthorizationBroker.java @@ -75,12 +75,21 @@ protected SecurityContext checkSecurityContext(ConnectionContext context) throws return securityContext; } - protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) { - Destination existing = this.getDestinationMap(destination).get(destination); - if (existing != null) { + protected boolean checkDestinationAdminAdd(SecurityContext securityContext, ActiveMQDestination destination) { + if (this.getDestinationMap(destination).get(destination) != null) { return true; } + return checkDestinationAdmin(securityContext, destination); + } + protected boolean checkDestinationAdminRemove(SecurityContext securityContext, ActiveMQDestination destination) { + if (this.getDestinationMap(destination).get(destination) == null) { + return true; + } + return checkDestinationAdmin(securityContext, destination); + } + + protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) { if (!securityContext.isBrokerContext()) { Set allowedACLs = null; if (!destination.isTemporary()) { @@ -100,7 +109,7 @@ protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveM public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, info.getDestination())) { + if (!checkDestinationAdminAdd(securityContext, info.getDestination())) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + info.getDestination()); } @@ -108,10 +117,10 @@ public void addDestinationInfo(ConnectionContext context, DestinationInfo info) } @Override - public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception { + public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, destination)) { + if (!checkDestinationAdminAdd(securityContext, destination)) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination); } @@ -122,7 +131,7 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, destination)) { + if (!checkDestinationAdminRemove(securityContext, destination)) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + destination); } @@ -135,7 +144,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { final SecurityContext securityContext = checkSecurityContext(context); - if (!checkDestinationAdmin(securityContext, info.getDestination())) { + if (!checkDestinationAdminRemove(securityContext, info.getDestination())) { throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + info.getDestination()); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index 56baaeec98f..63b94dac5d9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -19,11 +19,14 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactoryHandler; import org.apache.activemq.broker.BrokerRegistry; @@ -44,12 +47,29 @@ public class VMTransportFactory extends TransportFactory { + public static final String VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP = + "org.apache.activemq.transport.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED"; + public static final String DEFAULT_ALLOWED_SCHEMES = "broker,properties"; + public static final ConcurrentMap BROKERS = new ConcurrentHashMap(); public static final ConcurrentMap CONNECTORS = new ConcurrentHashMap(); public static final ConcurrentMap SERVERS = new ConcurrentHashMap(); private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class); BrokerFactoryHandler brokerFactoryHandler; + private final Set allowedSchemes; + + public VMTransportFactory() { + final String allowedSchemes = System.getProperty(VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP, + DEFAULT_ALLOWED_SCHEMES); + + // Asterisk will map to null which will allow all and skip checking + // Empty string will map to an empty set and will deny all + this.allowedSchemes = !allowedSchemes.equals("*") ? + Arrays.stream(allowedSchemes.split("\\s*,\\s*")) + .filter(s -> !s.isBlank()) + .collect(Collectors.toUnmodifiableSet()) : null; + } @Override public Transport doConnect(URI location) throws Exception { @@ -119,6 +139,7 @@ public Transport doCompositeConnect(URI location) throws Exception { throw new IOException("Broker named '" + host + "' does not exist."); } try { + validateBrokerCreationSchema(host, brokerURI); if (brokerFactoryHandler != null) { broker = brokerFactoryHandler.createBroker(brokerURI); } else { @@ -162,6 +183,20 @@ public Transport doCompositeConnect(URI location) throws Exception { return transport; } + private void validateBrokerCreationSchema(String host, URI brokerURI) { + if (allowedSchemes != null) { + final String detectedScheme = brokerURI.getScheme(); + if (detectedScheme == null) { + throw new IllegalArgumentException("Could not detect scheme in given URI [" + brokerURI + "]"); + } + if (!allowedSchemes.contains(detectedScheme)){ + throw new IllegalArgumentException("Broker named '" + host + "' does not exist and " + + "broker creation using the scheme '" + detectedScheme + "' is not enabled via the VMTransportFactory. " + + "To allow creation, configure the system property " + VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP); + } + } + } + private static String extractHost(URI location) { String host = location.getHost(); if (host == null || host.length() == 0) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index e07638f31f3..3760d449c31 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -2086,6 +2086,14 @@ protected void testAddTransportConnectorBlockedBrokerView(String scheme) throws assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); } + try { + // verify any composite URI is blocked as well without parens + brokerView.addConnector("static:tcp://0.0.0.0:0," + scheme + "://" + brokerName); + fail("Should have failed trying to add connector with scheme: " + scheme); + } catch (IllegalArgumentException e) { + assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); + } + try { // verify nested composite URI is blocked brokerView.addConnector("static:(static:(static:(" + scheme + "://localhost)))"); @@ -2108,6 +2116,15 @@ public void testNestedAddTransportConnector() throws Exception { } catch (IllegalArgumentException e) { assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); } + + try { + // verify nested composite URI with more than 5 levels is blocked without parens + brokerView.addConnector( + "static:static:static:static:static:static:tcp://localhost:0"); + fail("Should have failed trying to add vm connector bridge"); + } catch (IllegalArgumentException e) { + assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); + } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java index 936603c3b82..68139efe5cc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/config/BrokerXmlConfigFromJNDITest.java @@ -16,19 +16,34 @@ */ package org.apache.activemq.config; -import java.io.File; -import java.util.Hashtable; - -import javax.naming.Context; -import javax.naming.InitialContext; +import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.io.File; +import java.util.Hashtable; + /** * */ public class BrokerXmlConfigFromJNDITest extends JmsTopicSendReceiveWithTwoConnectionsTest { + + @Override + protected void setUp() throws Exception { + // reset before each test + resetVmTransportFactory("xbean"); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + resetVmTransportFactory(); + } + @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { assertBaseDirectoryContainsSpaces(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java index 3d3b8c1cdc5..b20518e884c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/JmxCreateNCTest.java @@ -82,6 +82,31 @@ public void testBridgeRegistration() throws Exception { assertEquals("NC", nc.getName()); } + @Test + public void testTransportSchemeBridgeAllowed() throws Exception { + // Test composite network connector uri + String name = proxy.addNetworkConnector("static:(tcp://localhost,amqp://localhost)"); + proxy.removeNetworkConnector(name); + + // Test composite with missing parens + name = proxy.addNetworkConnector("static:amqp://localhost,tcp://127.0.0.1:0"); + proxy.removeNetworkConnector(name); + + // verify direct connector as well + name = proxy.addNetworkConnector("static:stomp://localhost"); + proxy.removeNetworkConnector(name); + + // verify nested composite URI + name = proxy.addNetworkConnector( + "static:(static:(static:(tcp+ssl://localhost:0,auto+nio+ssl://localhost)))"); + proxy.removeNetworkConnector(name); + + // verify nested composite URI is not blocked when not using parens + name = proxy.addNetworkConnector( + "static:static:static:123://localhost:0,auto+nio+ssl://localhost"); + proxy.removeNetworkConnector(name); + } + @Test public void testTransportSchemeBridgeBlocked() throws Exception { for (String deniedScheme : DENIED_TRANSPORT_SCHEMES) { @@ -99,6 +124,14 @@ protected void testTransportSchemeBridgeBlocked(String scheme) throws Exception assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); } + // Test composite with missing parens + try { + proxy.addNetworkConnector("static:" + scheme + "://localhost,tcp://127.0.0.1:0"); + fail("Should have failed trying to add connector bridge with scheme: " + scheme); + } catch (IllegalArgumentException e) { + assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); + } + // verify direct connector as well try { proxy.addNetworkConnector(scheme + "://localhost"); @@ -114,6 +147,14 @@ protected void testTransportSchemeBridgeBlocked(String scheme) throws Exception } catch (IllegalArgumentException e) { assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); } + + try { + // verify nested composite URI is blocked when not using parens + proxy.addNetworkConnector("static:static:static:tcp://localhost:0," + scheme + "://localhost"); + fail("Should have failed trying to add connector bridge with scheme: " + scheme); + } catch (IllegalArgumentException e) { + assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage()); + } } @Test @@ -124,12 +165,21 @@ public void testAddNetworkConnectorMaxComposite() throws Exception { try { // verify nested composite URI with more than 5 levels is blocked. This has 6 nested - // (not including first wrapper url + // (not including first wrapper url) proxy.addNetworkConnector( "static:(static:(static:(static:(static:(static:(tcp://localhost:0))))))"); fail("Should have failed trying to add more than 5 connector bridges"); } catch (IllegalArgumentException e) { assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); } + + try { + // verify nested composite URI with more than 5 levels is blocked without parens + proxy.addNetworkConnector( + "static:static:static:static:static:static:tcp://localhost:0"); + fail("Should have failed trying to add more than 5 connector bridges"); + } catch (IllegalArgumentException e) { + assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage()); + } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java new file mode 100644 index 00000000000..47df7ef8baa --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/AbstractDurableSyncNetworkBridgeTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractDurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger( + AbstractDurableSyncNetworkBridgeTest.class); + + protected abstract void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, File dataDir) throws Exception; + + protected abstract void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception; + + protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { + stopLocalBroker(); + doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); + } + + protected void restartRemoteBroker() throws Exception { + final int previousPort = remoteBroker.getTransportConnectors().get(0).getConnectUri().getPort(); + final File dataDir = remoteBroker.getDataDirectoryFile(); + stopRemoteBroker(); + try { + doSetUpRemoteBroker(false, dataDir, previousPort); + } catch (final IOException e) { + if (e.getCause() instanceof java.net.BindException) { + // Previous port still in TIME_WAIT — use a new ephemeral port + doSetUpRemoteBroker(false, dataDir, 0); + // Update the local broker's network connector to point to the new port + updateLocalNetworkConnectorUri(); + } else { + throw e; + } + } + } + + protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { + if (broker.getBrokerName().equals("localBroker")) { + restartLocalBroker(startNetworkConnector); + } else { + restartRemoteBroker(); + } + } + + protected void waitForBridgeFullyStarted() throws Exception { + waitForBridgeFullyStarted(TimeUnit.SECONDS.toMillis(15), true); + } + + protected void waitForBridgeFullyStarted(long millis, boolean duplex) throws Exception { + // Wait for the local bridge to be fully started (advisory consumers registered) + assertTrue("Local bridge should be fully started", Wait.waitFor(() -> { + if (localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { + return false; + } + final NetworkBridge bridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + if (bridge instanceof DemandForwardingBridgeSupport) { + return ((DemandForwardingBridgeSupport) bridge).startedLatch.getCount() == 0; + } + return true; + }, millis, 100)); + + // Also wait for the duplex bridge on the remote broker to be fully started. + // The duplex connector creates a separate DemandForwardingBridge on the remote side + // that also needs its advisory consumers registered before it can process events. + if (duplex) { + assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> { + final DemandForwardingBridge duplexBridge = findDuplexBridge( + remoteBroker.getTransportConnectors().get(0)); + return duplexBridge != null && duplexBridge.startedLatch.getCount() == 0; + }, millis, 100)); + } + } + + + /** + * When the remote broker restarts on a new ephemeral port (BindException fallback), + * any existing network connector on the local broker still points to the old port. + * This method stops the old connector and replaces it with one targeting the new URI. + */ + protected void updateLocalNetworkConnectorUri() throws Exception { + if (localBroker == null) { + return; + } + final List connectors = localBroker.getNetworkConnectors(); + if (connectors.isEmpty()) { + return; + } + final NetworkConnector oldConnector = connectors.get(0); + oldConnector.stop(); + localBroker.removeNetworkConnector(oldConnector); + final NetworkConnector newConnector = configureLocalNetworkConnector(); + localBroker.addNetworkConnector(newConnector); + newConnector.start(); + } + + protected abstract NetworkConnector configureLocalNetworkConnector() throws Exception; + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java new file mode 100644 index 00000000000..646dd4f184d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerSubscriptionInfo; +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(Parameterized.class) +public class DurableSyncNetworkBridgeAuthTest extends AbstractDurableSyncNetworkBridgeTest { + + protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeAuthTest.class); + + @Parameters(name="duplex={0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {true}, + {false} + }); + } + + @Rule + public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS); + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + + static { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + } + + private static final String USER_PASSWORD = "password"; + private final boolean duplex; + private final AtomicReference brokerSubInfo = new AtomicReference<>(); + private String ncPassword = USER_PASSWORD; + + public DurableSyncNetworkBridgeAuthTest(boolean duplex) { + this.duplex = duplex; + } + + @Before + public void setUp() throws Exception { + this.ncPassword = USER_PASSWORD; + this.brokerSubInfo.set(null); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + @Test + public void testAuthSuccess() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Simulate a connection exception and reconnect, we should receive again + brokerSubInfo.set(null); + localBroker.getNetworkConnectors().get(0).activeBridges().stream() + .findFirst().orElseThrow().serviceRemoteException(new Exception()); + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + @Test + public void testAuthFailure() throws Exception { + this.ncPassword = "badpassword"; + try { + // set a shorter wait time, it won't connect with bad password + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(5)); + throw new IllegalStateException("Should have received assertion error with bad password"); + } catch (AssertionError e) { + // expected + } + + // Because the local broker was not authenticated by the remote broker, the local broker + // should not have received back the BrokerSubscriptionInfo + assertNull(brokerSubInfo.get()); + } + + @Test + public void testRestartSync() throws Exception { + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + + // When the local broker starts the bridge it will send its BrokerSubscriptionInfo list + // automatically on connect so the remote broker will always receive it. However, the + // remote broker should only send back its list after the connection is properly authenticated. + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // Restart, should receive again with new connection + brokerSubInfo.set(null); + restartRemoteBroker(); + + // Wait for the reconnect and receive of BrokerSubInfo + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + } + + protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, + File remoteDataDir, long waitForStart) throws Exception { + doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); + doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); + //Wait for the bridge to be fully started + if (startNetworkConnector) { + waitForBridgeFullyStarted(waitForStart, duplex); + } + } + + protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, + File dataDir) throws Exception { + localBroker = createLocalBroker(dataDir, startNetworkConnector); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + + if (startNetworkConnector) { + // Best-effort wait for the bridge to appear. Do NOT use assertTrue here + // because some tests restart localBroker before remoteBroker is running, + // relying on the bridge connecting later when remoteBroker restarts. + // Tests that need the bridge to be fully started call assertBridgeStarted() explicitly. + // Keep timeout short (5s) to avoid growing the NC reconnect backoff too much, + // which would delay bridge formation when the remote broker starts later. + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + TimeUnit.SECONDS.toMillis(5), 500); + } + + } + + protected void doSetUpRemoteBroker(boolean deleteAllMessages, File dataDir, int port) throws Exception { + remoteBroker = createRemoteBroker(dataDir, port); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("localBroker"); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + if (startNetworkConnector) { + brokerService.addNetworkConnector(configureLocalNetworkConnector()); + } + + //Use auto+nio+ssl to test out the transport works with bridging + brokerService.addConnector("auto+nio+ssl://localhost:0"); + + return brokerService; + } + + @Override + protected NetworkConnector configureLocalNetworkConnector() throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)) { + @Override + protected NetworkBridge createBridge(Transport localTransport, + Transport remoteTransport, DiscoveryEvent event) { + // Add a listener so we can capture if the remote broker sends + // back a BrokerSubscriptionInfo object + final Transport remoteFilter = new TransportFilter(remoteTransport) { + @Override + public void onCommand(Object command) { + if (command instanceof BrokerSubscriptionInfo) { + if (brokerSubInfo.get() != null) { + throw new IllegalStateException("Received BrokerSubscriptionInfo more than once."); + } + brokerSubInfo.set((BrokerSubscriptionInfo) command); + } + super.onCommand(command); + } + }; + return super.createBridge(localTransport, remoteFilter, event); + } + }; + connector.setName("networkConnector"); + connector.setUserName("user1"); + connector.setPassword(ncPassword); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(duplex); + connector.setStaticBridge(false); + connector.setSyncDurableSubs(true); + connector.setDynamicallyIncludedDestinations(List.of(new ActiveMQTopic("include.test.>"))); + return connector; + } + + protected BrokerService createRemoteBroker(File dataDir, int port) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(dataDir); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setDirectory(dataDir); + adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name()); + brokerService.setPersistenceAdapter(adapter); + + // Add authentication to the remote broker + AuthenticationUser user = new AuthenticationUser("user1", USER_PASSWORD, "group1"); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(); + authenticationPlugin.setUsers(List.of(user)); + brokerService.setPlugins(new BrokerPlugin[] {authenticationPlugin}); + + brokerService.addConnector("auto+nio+ssl://localhost:" + port); + + return brokerService; + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index a1329ae9c79..ef4d03c39c5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) -public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { +public class DurableSyncNetworkBridgeTest extends AbstractDurableSyncNetworkBridgeTest { protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class); @@ -700,14 +700,6 @@ protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination.. return compositeTopic; } - protected void restartBroker(BrokerService broker, boolean startNetworkConnector) throws Exception { - if (broker.getBrokerName().equals("localBroker")) { - restartLocalBroker(startNetworkConnector); - } else { - restartRemoteBroker(); - } - } - protected void restartBrokers(boolean startNetworkConnector) throws Exception { doTearDown(); doSetUp(false, startNetworkConnector, localBroker.getDataDirectoryFile(), @@ -719,23 +711,13 @@ protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, included = new ActiveMQTopic(testTopicName); doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0); doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, localDataDir); - //Give time for advisories to propagate - Thread.sleep(1000); - } - - protected void restartLocalBroker(boolean startNetworkConnector) throws Exception { - stopLocalBroker(); - doSetUpLocalBroker(false, startNetworkConnector, localBroker.getDataDirectoryFile()); - } - - protected void restartRemoteBroker() throws Exception { - int port = 0; - if (remoteBroker != null) { - List transportConnectors = remoteBroker.getTransportConnectors(); - port = transportConnectors.get(0).getConnectUri().getPort(); + //Wait for the bridge to be fully started (advisory consumers registered). + //Note: activeBridges().size() == 1 is NOT sufficient because bridges are added + //to the map before start() completes asynchronously. We must wait for the + //startedLatch which counts down after advisory consumers are registered. + if (startNetworkConnector) { + waitForBridgeFullyStarted(); } - stopRemoteBroker(); - doSetUpRemoteBroker(false, remoteBroker.getDataDirectoryFile(), port); } protected void doSetUpLocalBroker(boolean deleteAllMessages, boolean startNetworkConnector, @@ -817,6 +799,7 @@ protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConn return brokerService; } + @Override protected NetworkConnector configureLocalNetworkConnector() throws Exception { List transportConnectors = remoteBroker.getTransportConnectors(); URI remoteURI = transportConnectors.get(0).getConnectUri(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java new file mode 100644 index 00000000000..a06861fa0c0 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/DestinationAdminAuthzTest.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.security; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.Session; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.filter.DestinationMap; +import org.apache.activemq.jaas.GroupPrincipal; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.security.Principal; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +@RunWith(value = Parameterized.class) +public class DestinationAdminAuthzTest { + + private static final Logger LOG = LoggerFactory.getLogger(DestinationAdminAuthzTest.class); + + public static final String APP1_USER_NAME = "app1"; + public static final String APP2_USER_NAME = "app2"; + public static final String APP3_USER_NAME = "app3"; + + public static final String APP1_GROUP_NAME = "app1Group"; + public static final String APP2_GROUP_NAME = "app2Group"; + public static final String APP3_GROUP_NAME = "app3Group"; + + public static final GroupPrincipal ADMINS = new GroupPrincipal("admins"); + public static final GroupPrincipal APP1GROUP = new GroupPrincipal(APP1_GROUP_NAME); + public static final GroupPrincipal APP2GROUP = new GroupPrincipal(APP2_GROUP_NAME); + public static final GroupPrincipal APP3GROUP = new GroupPrincipal(APP3_GROUP_NAME); + + public static Principal WILDCARD; + static { + try { + WILDCARD = (Principal) DefaultAuthorizationMap.createGroupPrincipal( + "*", GroupPrincipal.class.getName()); + } catch (Exception e) { + LOG.error("Failed to make wildcard principal", e); + } + } + + private BrokerService brokerService = null; + private String destinationType = null; + private String userName = null; + private boolean errorExpected = true; + + @Parameterized.Parameters(name="type={0}, user={1}, error={2}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"queue", APP1_USER_NAME, false}, + {"topic", APP1_USER_NAME, false}, + {"temp-queue", APP1_USER_NAME, false}, + {"temp-topic", APP1_USER_NAME, false}, + {"queue", APP2_USER_NAME, false}, + {"topic", APP2_USER_NAME, false}, + {"temp-queue", APP2_USER_NAME, false}, + {"temp-topic", APP2_USER_NAME, false}, + {"queue", APP3_USER_NAME, true}, + {"topic", APP3_USER_NAME, true}, + {"temp-queue", APP3_USER_NAME, true}, + {"temp-topic", APP3_USER_NAME, true} + }); + } + + public DestinationAdminAuthzTest(String destinationType, String userName, boolean errorExpected) { + this.destinationType = destinationType; + this.userName = userName; + this.errorExpected = errorExpected; + } + + @Before + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.start(); + brokerService.waitUntilStarted(5_000L); + } + + @After + public void tearDown() throws Exception { + if (brokerService != null && brokerService.isStarted()) { + LOG.info("Shutting down broker:{}", brokerService.getBrokerName()); + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + + @Test + public void testDestinationPermissions() throws Exception { + boolean exceptionOccured = false; + + var connectionFactory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); + connectionFactory.setWatchTopicAdvisories(false); + + final String destName = userName + ".in";; + ActiveMQDestination tmpActiveMQDestination = null; + switch (destinationType) { + case "temp-queue": tmpActiveMQDestination = new ActiveMQTempQueue(); break; + case "temp-topic": tmpActiveMQDestination = new ActiveMQTempTopic(); break; + case "queue": tmpActiveMQDestination = new ActiveMQQueue(destName); break; + case "topic": tmpActiveMQDestination = new ActiveMQTopic(destName); break; + default: throw new IllegalArgumentException("Unsupported destinationType: " + destinationType); + }; + var activemqDestination = tmpActiveMQDestination; + + try (final var connection = connectionFactory.createConnection(userName, "password"); + final var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + connection.start(); + + Destination tmpDestination = null; + switch (destinationType) { + case "temp-queue": tmpDestination = session.createTemporaryQueue(); break; + case "temp-topic": tmpDestination = session.createTemporaryTopic(); break; + case "queue": tmpDestination = session.createQueue(destName); break; + case "topic": tmpDestination = session.createTopic(destName); break; + default: throw new IllegalArgumentException("Unsupported destinationType: " + destinationType); + }; + final var destination = tmpDestination; + + if(activemqDestination.isTemporary()) { + switch (destinationType) { + case "temp-queue": activemqDestination.setPhysicalName(destination.toString().substring("temp-queue://".length())); break; + case "temp-topic": activemqDestination.setPhysicalName(destination.toString().substring("temp-topic://".length())); break; + } + } + + var brokerDestination = brokerService.getDestination(activemqDestination); + + try (var messageProducer = session.createProducer(destination); + var messageConsumer = session.createConsumer(destination)) { + + messageProducer.send(session.createTextMessage("Test message")); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + return brokerService.getDestination(activemqDestination) != null; + } catch (Exception e) { + return false; + } + } + }, 5_000L, 50L); + + var message = messageConsumer.receive(2_000L); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1L == brokerDestination.getDestinationStatistics().getDequeues().getCount(); + } + }, 5_000L, 50L); + + assertNotNull(message); + assertTrue(message.propertyExists("JMSXUserID")); + assertEquals(userName, message.getStringProperty("JMSXUserID")); + assertEquals("Test message", message.getBody(String.class)); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0L == brokerDestination.getDestinationStatistics().getConsumers().getCount(); + } + }, 5_000L, 50L); + + if (!activemqDestination.isTemporary()) { + ((ActiveMQConnection) connection).destroyDestination(activemqDestination); + } + + // Check removing a destination that already exists, but does not have authz + boolean removeExceptionOccured = false; + try { + for (var otherActiveMQDestination : Set.of(new ActiveMQQueue("other.in"), new ActiveMQTopic("other.in"))) { + ((ActiveMQConnection) connection).destroyDestination(otherActiveMQDestination); + } + } catch (JMSException e) { + removeExceptionOccured = true; + } + assertTrue(removeExceptionOccured); + + // Check removing a destination that does not exist does not throw an error + boolean notExistExceptionOccured = false; + try { + for (var noexistActiveMQDestination : Set.of(new ActiveMQQueue("noexist.in"), new ActiveMQTopic("noexist.in"))) { + ((ActiveMQConnection) connection).destroyDestination(noexistActiveMQDestination); + } + } catch (JMSException e) { + notExistExceptionOccured = true; + } + assertFalse(notExistExceptionOccured); + + } catch (JMSException e) { + exceptionOccured = true; + } + + assertEquals(errorExpected, exceptionOccured); + } + + public static AuthorizationMap createAuthorizationMap() { + DestinationMap readAccess = new DefaultAuthorizationMap(); + readAccess.put(new ActiveMQQueue(">"), ADMINS); + readAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP); + readAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP); + readAccess.put(new ActiveMQTopic(">"), ADMINS); + readAccess.put(new ActiveMQTopic("app1.>"), APP1GROUP); + readAccess.put(new ActiveMQTopic("app2.>"), APP2GROUP); + + // Note: APP3 intentionally has no acccess + // readAccess.put(new ActiveMQQueue("app3.>"), APP3); + // readAccess.put(new ActiveMQTopic("app3.>"), APP3); + + DestinationMap writeAccess = new DefaultAuthorizationMap(); + writeAccess.put(new ActiveMQQueue(">"), ADMINS); + writeAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP); + writeAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP); + writeAccess.put(new ActiveMQTopic(">"), ADMINS); + writeAccess.put(new ActiveMQTopic("app1.>"), APP1GROUP); + writeAccess.put(new ActiveMQTopic("app2.>"), APP2GROUP); + + DestinationMap adminAccess = new DefaultAuthorizationMap(); + adminAccess.put(new ActiveMQTopic(">"), ADMINS); + adminAccess.put(new ActiveMQTopic("app1.>"), APP1GROUP); + adminAccess.put(new ActiveMQTopic("app2.>"), APP2GROUP); + adminAccess.put(new ActiveMQQueue(">"), ADMINS); + adminAccess.put(new ActiveMQQueue("app1.>"), APP1GROUP); + adminAccess.put(new ActiveMQQueue("app2.>"), APP2GROUP); + + readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD); + + var authorizationMap = new SimpleAuthorizationMap(writeAccess, readAccess, adminAccess); + var tempDestinationAuthorizationEntry = new TempDestinationAuthorizationEntry(); + + try { + tempDestinationAuthorizationEntry.setAdmin("admins, app1Group, app2Group"); + tempDestinationAuthorizationEntry.setRead("admins, app1Group, app2Group"); + tempDestinationAuthorizationEntry.setWrite("admins, app1Group, app2Group"); + } catch (Exception e) { + fail(e.getMessage()); + } + + authorizationMap.setTempDestinationAuthorizationEntry(tempDestinationAuthorizationEntry); + return authorizationMap; + } + + public static class SimpleAuthenticationBrokerPlugin implements BrokerPlugin { + + private final Map userPasswords; + private final Map> userGroups; + + public SimpleAuthenticationBrokerPlugin(final Map userPasswords, final Map> userGroups) { + this.userPasswords = userPasswords; + this.userGroups = userGroups; + } + + public Broker installPlugin(Broker broker) { + return new SimpleAuthenticationBroker(broker, userPasswords, userGroups); + } + + public String toString() { + return "SimpleAuthenticationBroker"; + } + } + + protected BrokerService createBroker() throws Exception { + var brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setDestinations(new ActiveMQDestination[] { new ActiveMQQueue("other.in"), new ActiveMQTopic("other.in") }); + brokerService.setPersistent(false); + brokerService.setPlugins(new BrokerPlugin[] {new AuthorizationPlugin(createAuthorizationMap()), new SimpleAuthenticationBrokerPlugin(createUserPasswords(), createUserGroups())}); + brokerService.setPopulateJMSXUserID(true); + brokerService.setSchedulerSupport(false); + brokerService.setUseAuthenticatedPrincipalForJMSXUserID(true); + brokerService.setUseJmx(true); + return brokerService; + } + + private static Map createUserPasswords() { + var users = new HashMap(); + users.put("admin", "password"); + users.put(APP1_USER_NAME, "password"); + users.put(APP2_USER_NAME, "password"); + users.put(APP3_USER_NAME, "password"); + return users; + } + + private static Map> createUserGroups() { + var groups = new HashMap>(); + groups.put("admin", new HashSet(Arrays.asList(new Principal[] {new GroupPrincipal("admins")}))); + groups.put(APP1_USER_NAME, new HashSet(Arrays.asList(new Principal[] { new GroupPrincipal(APP1_GROUP_NAME) }))); + groups.put(APP2_USER_NAME, new HashSet(Arrays.asList(new Principal[] { new GroupPrincipal(APP2_GROUP_NAME) }))); + groups.put(APP3_USER_NAME, new HashSet(Arrays.asList(new Principal[] { new GroupPrincipal(APP3_GROUP_NAME) }))); + return groups; + } +} + diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java index dfd774ea452..38d0dea9f5f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryXBeanTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.spring; +import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory; import static org.apache.activemq.xbean.XBeanBrokerFactory.DEFAULT_ALLOWED_PROTOCOLS; import static org.apache.activemq.xbean.XBeanBrokerFactory.XBEAN_BROKER_FACTORY_PROTOCOLS_PROP; import static org.junit.Assert.assertNotNull; @@ -28,14 +29,22 @@ import java.net.UnknownHostException; import java.nio.file.NoSuchFileException; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.transport.vm.VMTransportFactory; import org.apache.activemq.xbean.XBeanBrokerFactory; import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.springframework.beans.FatalBeanException; public class ActiveMQConnectionFactoryXBeanTest { + @BeforeClass + public static void beforeClass() throws Exception { + // enable xbean for the vm transport factory + resetVmTransportFactory("xbean"); + } + @Before public void setUp() throws Exception { // reset before each test @@ -45,6 +54,8 @@ public void setUp() throws Exception { @AfterClass public static void tearDown() throws Exception { System.setProperty(XBEAN_BROKER_FACTORY_PROTOCOLS_PROP, DEFAULT_ALLOWED_PROTOCOLS); + // reset defaults + resetVmTransportFactory(); } // File and classpath are allowed by default diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportFactoryTest.java new file mode 100644 index 00000000000..1f05168ed4a --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportFactoryTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.vm; + +import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import jakarta.jms.Connection; +import jakarta.jms.JMSException; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; + +public class VMTransportFactoryTest { + + @Before + public void setUp() throws Exception { + // reset before each test + resetVmTransportFactory(); + } + + @AfterClass + public static void tearDown() throws Exception { + resetVmTransportFactory(); + } + + @Test + public void testDefaults() throws Exception { + // broker and properties allowed by default + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + + // xbean not allowed by default + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testAllowAll() throws Exception { + resetVmTransportFactory("broker,properties,xbean"); + + // broker and properties allowed by default + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + + // xbean now allowed + assertBrokerCreated("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testAllowAllWildcard() throws Exception { + resetVmTransportFactory("*"); + + // all allowed + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + assertBrokerCreated("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testNullSchemes() throws Exception { + // should set to defaults + resetVmTransportFactory(null); + + // broker and properties allowed by default + assertBrokerCreated("vm:localhost?persistent=false"); + assertBrokerCreated("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + + // xbean not allowed by default + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testNoneAllowed() throws Exception { + // deny all + resetVmTransportFactory(""); + + // nothing allowed + assertBrokerStartError("vm:localhost?persistent=false"); + assertBrokerStartError("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerStartError("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + @Test + public void testOneAllowed() throws Exception { + // deny all + resetVmTransportFactory("properties"); + + // only properties allowed + assertBrokerStartError("vm:localhost?persistent=false"); + assertBrokerStartError("vm:(broker:(tcp://localhost:0)?persistent=false)"); + assertBrokerCreated("vm://localhost?brokerConfig=properties:org/apache/activemq/config/broker.properties"); + assertBrokerStartError("vm://localhost?brokerConfig=xbean:activemq.xml"); + } + + private void assertBrokerCreated(String url) throws JMSException { + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + } + } + + private void assertBrokerStartError(String url) { + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + try (Connection ignored = factory.createConnection()) { + fail("Should have failed with an exception"); + } catch (JMSException e) { + Throwable cause = e.getCause() != null ? e.getCause() : e; + assertTrue(cause instanceof IllegalArgumentException); + } + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/util/VmTransportTestUtils.java b/activemq-unit-tests/src/test/java/org/apache/activemq/util/VmTransportTestUtils.java new file mode 100644 index 00000000000..7a4d38aee8f --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/util/VmTransportTestUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import java.lang.reflect.Field; +import java.util.concurrent.ConcurrentMap; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.vm.VMTransportFactory; + +public class VmTransportTestUtils { + + public static void resetVmTransportFactory() throws Exception { + resetVmTransportFactory(VMTransportFactory.DEFAULT_ALLOWED_SCHEMES); + } + + @SuppressWarnings("unchecked") + public static void resetVmTransportFactory(String allowedSchemes) throws Exception { + if (allowedSchemes == null) { + System.clearProperty(VMTransportFactory.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP); + } else { + // set property to allowed schemes + System.setProperty(VMTransportFactory.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP, + allowedSchemes); + } + + // clear any cached factory so the next call will create a new transport and use + // the correct property setting + Field factoriesField = TransportFactory.class.getDeclaredField("TRANSPORT_FACTORYS"); + factoriesField.setAccessible(true); + ConcurrentMap factories = + (ConcurrentMap) factoriesField.get(TransportFactory.class); + factories.remove("vm"); + } +} diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java index fa3dd50e095..10675a7cf2c 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java @@ -48,7 +48,13 @@ * there will always be a chance of losing messages. Consider what happens when * a message is retrieved from the broker but the web call is interrupted before * the client receives the message in the response - the message is lost. + * + * @deprecated - WARNING: The MessageServlet should be used with caution as it is unmaintained + * and there are multiple security related issues. This servlet is primarily meant for demo + * purposes only and will be removed entirely in a future release. It is recommended to + * keep it disabled. */ +@Deprecated public class MessageServlet extends MessageServletSupport { // its a bit pita that this servlet got intermixed with asyncRequest/rest diff --git a/assembly/src/release/conf/jetty.xml b/assembly/src/release/conf/jetty.xml index 5eff735f035..5f18d7c1010 100644 --- a/assembly/src/release/conf/jetty.xml +++ b/assembly/src/release/conf/jetty.xml @@ -21,6 +21,31 @@ + + @@ -55,6 +80,10 @@ + + + + @@ -74,6 +103,14 @@ + + @@ -116,6 +153,7 @@ + @@ -131,19 +169,94 @@ + + + + + + + + + + + + + + + + + + + + 127.0.0.1 + + + + + + + ::1 + + + + + + - - - - - - - - - - + @@ -192,10 +305,10 @@ - + - + diff --git a/assembly/src/release/conf/jolokia-access.xml b/assembly/src/release/conf/jolokia-access.xml index 97b099a5b7f..6844c314608 100644 --- a/assembly/src/release/conf/jolokia-access.xml +++ b/assembly/src/release/conf/jolokia-access.xml @@ -17,7 +17,14 @@ --> - + + + post + + + @@ -46,11 +53,143 @@ + + + org.apache.activemq:type=Broker,brokerName=* + + terminateJVM + stop + stopGracefully + restart + gc + + addConnector + removeConnector + addNetworkConnector + removeNetworkConnector + addQueue + removeQueue + addTopic + removeTopic + createDurableSubscriber + destroyDurableSubscriber + + reloadLog4jProperties + setMemoryLimit + setStoreLimit + setTempLimit + setJobSchedulerStoreLimit + setMaxUncommittedCount + + + + + org.apache.activemq:type=Broker,brokerName=*,destinationType=*,destinationName=* + purge + removeMessage + removeMatchingMessages + copyMessageTo + copyMatchingMessagesTo + moveMessageTo + moveMatchingMessagesTo + retryMessage + retryMessages + removeMessageGroup + removeAllMessageGroups + sendTextMessage + sendTextMessageWithProperties + pause + resume + + + + + org.apache.activemq:type=Broker,brokerName=*,destinationType=*,destinationName=*,endpoint=Consumer,clientId=*,consumerId=* + destroy + removeMessage + setSelector + + + + + org.apache.activemq:type=Broker,brokerName=*,service=JobScheduler,name=* + removeJob + removeAllJobs + removeAllJobsAtScheduledTime + + + + + org.apache.activemq:type=Broker,brokerName=*,service=Log4JConfiguration + setRootLogLevel + setLogLevel + reloadLog4jProperties + + + + + org.apache.activemq:type=Broker,brokerName=*,connector=networkConnectors,networkConnectorName=* + Password + RemotePassword + setUserName + setPassword + setRemoteUserName + setRemotePassword + setBridgeTempDestinations + setConduitSubscriptions + setDispatchAsync + setDynamicOnly + setMessageTTL + setConsumerTTL + setPrefetchSize + setAdvisoryPrefetchSize + setDecreaseNetworkConsumerPriority + setSuppressDuplicateQueueSubscriptions + setSuppressDuplicateTopicSubscriptions + + + + + javax.management.loading:type=MLet + * + * + + + JMImplementation:* + * + * + org.apache.logging.log4j2:* * * + + java.util.logging:* + * + * + + + + java.lang:type=Memory + * + * + + + java.lang:type=ClassLoading + * + * + + + + java.lang:type=Runtime + SystemProperties + InputArguments + * + com.sun.management:type=DiagnosticCommand * diff --git a/assembly/src/release/webapps/api/WEB-INF/web.xml b/assembly/src/release/webapps/api/WEB-INF/web.xml index 2a1a0103649..d260009cd78 100644 --- a/assembly/src/release/webapps/api/WEB-INF/web.xml +++ b/assembly/src/release/webapps/api/WEB-INF/web.xml @@ -22,11 +22,19 @@ Apache ActiveMQ REST API + - + jolokia-agent @@ -74,11 +88,6 @@ 1 - - MessageServlet - /message/* - - jolokia-agent /jolokia/*