Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -162,6 +163,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
private final long connectedTimestamp;
private final CompletableFuture<ConnectionId> initialConnectionId = new CompletableFuture<>();

/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
Expand Down Expand Up @@ -850,11 +852,16 @@ public Response processAddConnection(ConnectionInfo info) throws Exception {

try {
broker.addConnection(context, info);
// Complete the future with the connectionId if we completed
// the broker.addConnection() chain successfully
initialConnectionId.complete(info.getConnectionId());
} catch (Exception e) {
synchronized (brokerConnectionStates) {
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
// complete with the exception
initialConnectionId.completeExceptionally(e);
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}",
info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
//AMQ-6561 - stop for all exceptions on addConnection
Expand Down Expand Up @@ -1389,13 +1396,10 @@ public Response processBrokerInfo(BrokerInfo info) {
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
try {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(getNetworkConfiguration(info), info);
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e);
return null;
}
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
Expand All @@ -1405,10 +1409,8 @@ public Response processBrokerInfo(BrokerInfo info) {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());

if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(config, info);

// check for existing duplex connection hanging about

Expand Down Expand Up @@ -1475,6 +1477,30 @@ public Response processBrokerInfo(BrokerInfo info) {
return null;
}

private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) {
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
// this will complete when the connection id has been set, or immediately if already set
initialConnectionId.whenComplete((connectionId, t) -> {
try {
if (t != null) {
LOG.warn("SyncDurableSubs will be skipped due to error {}",
t.getMessage());
return;
}
// check connection still registered
if (lookupConnectionState(connectionId) != null) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
this.broker.getBrokerService(), config));
}
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}",
info.getBrokerId(), e);
}
});
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private HashMap<String, String> createMap(Properties properties) {
return new HashMap(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,26 +559,39 @@ private static void validateAllowedUrl(String uriString) throws URISyntaxExcepti
// Validate the URI does not contain a denied transport scheme
private static void validateAllowedUri(URI uri, int depth) throws URISyntaxException {
// Don't allow more than 5 nested URIs to prevent blowing the stack
// If we are greater than 4 then this is the 5th level of composite
if (depth > 4) {
if (depth > 5) {
throw new IllegalArgumentException("URI can't contain more than 5 nested composite URIs");
}

// First check the main URI scheme
validateAllowedScheme(uri.getScheme());

// If composite, iterate and check each of the composite URIs
if (URISupport.isCompositeURI(uri)) {
URISupport.CompositeData data = URISupport.parseComposite(uri);
// We need to check if the URI is composite and/or contains nested URIs
// The utility method URISupport#isCompositeURI is not good enough here
// because it misses if there are no parentheses and also is primarily meant
// for checking comma separated URIs and not nested URIs.
//
// The best way to handle all cases is to use the same logic that the transports
// use to process the URIs and that is to simply attempt to parse it and check each
// of the parsed components. This wll correctly handle the case when there
// are parentheses and also when the parentheses are skipped.
final URISupport.CompositeData data;
try {
data = URISupport.parseComposite(uri);
} catch (URISyntaxException e) {
// If this is not a valid URI then we can stop checking
// This can happen when parsing a nested URI and at the last portion
return;
}

if (data.getComponents() != null) {
depth++;
for (URI component : data.getComponents()) {
// Each URI could be a nested composite URI so call validateAllowedUri()
// to validate it. This check if composite first so we don't add to
// the recursive stack depth if there's a lot of URIs that are not composite
if (URISupport.isCompositeURI(component)) {
// Each URI could be a nested and/or composite URI so call validateAllowedUri()
// to validate it. If the scheme is null then the original URI is not composite
// or nested so we can skip the check, and we are finished.
if (component.getScheme() != null) {
validateAllowedUri(component, depth);
} else {
validateAllowedScheme(component.getScheme());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,21 @@ protected SecurityContext checkSecurityContext(ConnectionContext context) throws
return securityContext;
}

protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) {
Destination existing = this.getDestinationMap(destination).get(destination);
if (existing != null) {
protected boolean checkDestinationAdminAdd(SecurityContext securityContext, ActiveMQDestination destination) {
if (this.getDestinationMap(destination).get(destination) != null) {
return true;
}
return checkDestinationAdmin(securityContext, destination);
}

protected boolean checkDestinationAdminRemove(SecurityContext securityContext, ActiveMQDestination destination) {
if (this.getDestinationMap(destination).get(destination) == null) {
return true;
}
return checkDestinationAdmin(securityContext, destination);
}

protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) {
if (!securityContext.isBrokerContext()) {
Set<?> allowedACLs = null;
if (!destination.isTemporary()) {
Expand All @@ -100,18 +109,18 @@ protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveM
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, info.getDestination())) {
if (!checkDestinationAdminAdd(securityContext, info.getDestination())) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + info.getDestination());
}

super.addDestinationInfo(context, info);
}

@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, destination)) {
if (!checkDestinationAdminAdd(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination);
}

Expand All @@ -122,7 +131,7 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, destination)) {
if (!checkDestinationAdminRemove(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + destination);
}

Expand All @@ -135,7 +144,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, info.getDestination())) {
if (!checkDestinationAdminRemove(securityContext, info.getDestination())) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + info.getDestination());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import java.util.stream.Collectors;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerFactoryHandler;
import org.apache.activemq.broker.BrokerRegistry;
Expand All @@ -44,12 +47,29 @@

public class VMTransportFactory extends TransportFactory {

public static final String VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP =
"org.apache.activemq.transport.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED";
public static final String DEFAULT_ALLOWED_SCHEMES = "broker,properties";

public static final ConcurrentMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
public static final ConcurrentMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
public static final ConcurrentMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);

BrokerFactoryHandler brokerFactoryHandler;
private final Set<String> allowedSchemes;

public VMTransportFactory() {
final String allowedSchemes = System.getProperty(VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP,
DEFAULT_ALLOWED_SCHEMES);

// Asterisk will map to null which will allow all and skip checking
// Empty string will map to an empty set and will deny all
this.allowedSchemes = !allowedSchemes.equals("*") ?
Arrays.stream(allowedSchemes.split("\\s*,\\s*"))
.filter(s -> !s.isBlank())
.collect(Collectors.toUnmodifiableSet()) : null;
}

@Override
public Transport doConnect(URI location) throws Exception {
Expand Down Expand Up @@ -119,6 +139,7 @@ public Transport doCompositeConnect(URI location) throws Exception {
throw new IOException("Broker named '" + host + "' does not exist.");
}
try {
validateBrokerCreationSchema(host, brokerURI);
if (brokerFactoryHandler != null) {
broker = brokerFactoryHandler.createBroker(brokerURI);
} else {
Expand Down Expand Up @@ -162,6 +183,20 @@ public Transport doCompositeConnect(URI location) throws Exception {
return transport;
}

private void validateBrokerCreationSchema(String host, URI brokerURI) {
if (allowedSchemes != null) {
final String detectedScheme = brokerURI.getScheme();
if (detectedScheme == null) {
throw new IllegalArgumentException("Could not detect scheme in given URI [" + brokerURI + "]");
}
if (!allowedSchemes.contains(detectedScheme)){
throw new IllegalArgumentException("Broker named '" + host + "' does not exist and "
+ "broker creation using the scheme '" + detectedScheme + "' is not enabled via the VMTransportFactory. "
+ "To allow creation, configure the system property " + VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP);
}
}
}

private static String extractHost(URI location) {
String host = location.getHost();
if (host == null || host.length() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2086,6 +2086,14 @@ protected void testAddTransportConnectorBlockedBrokerView(String scheme) throws
assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage());
}

try {
// verify any composite URI is blocked as well without parens
brokerView.addConnector("static:tcp://0.0.0.0:0," + scheme + "://" + brokerName);
fail("Should have failed trying to add connector with scheme: " + scheme);
} catch (IllegalArgumentException e) {
assertEquals("Transport scheme '" + scheme + "' is not allowed", e.getMessage());
}

try {
// verify nested composite URI is blocked
brokerView.addConnector("static:(static:(static:(" + scheme + "://localhost)))");
Expand All @@ -2108,6 +2116,15 @@ public void testNestedAddTransportConnector() throws Exception {
} catch (IllegalArgumentException e) {
assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage());
}

try {
// verify nested composite URI with more than 5 levels is blocked without parens
brokerView.addConnector(
"static:static:static:static:static:static:tcp://localhost:0");
fail("Should have failed trying to add vm connector bridge");
} catch (IllegalArgumentException e) {
assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,34 @@
*/
package org.apache.activemq.config;

import java.io.File;
import java.util.Hashtable;

import javax.naming.Context;
import javax.naming.InitialContext;
import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;

import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.File;
import java.util.Hashtable;

/**
*
*/
public class BrokerXmlConfigFromJNDITest extends JmsTopicSendReceiveWithTwoConnectionsTest {

@Override
protected void setUp() throws Exception {
// reset before each test
resetVmTransportFactory("xbean");
super.setUp();
}

@Override
protected void tearDown() throws Exception {
super.tearDown();
resetVmTransportFactory();
}

@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
assertBaseDirectoryContainsSpaces();
Expand Down
Loading