Skip to content

Commit

Permalink
Added jms.allowTemporaryTopicWithoutAdmin parameter for allowing temp…
Browse files Browse the repository at this point in the history
…orary topic flow without admin privileges and changed exception handling to be specific.
  • Loading branch information
sandeep-ctds committed Nov 4, 2024
1 parent c3ed411 commit 4d27171
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ public TemporaryQueue createTemporaryQueue(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-queue-" + UUID.randomUUID();
createNonPartitionedTopic(name);
createPulsarTemporaryTopic(name);
PulsarTemporaryQueue res = new PulsarTemporaryQueue(name, session);
temporaryDestinations.add(res);
return res;
Expand All @@ -922,7 +922,7 @@ public TemporaryTopic createTemporaryTopic(PulsarSession session) throws JMSExce
checkNotClosed();
String name =
"persistent://" + factory.getSystemNamespace() + "/jms-temp-topic-" + UUID.randomUUID();
createNonPartitionedTopic(name);
createPulsarTemporaryTopic(name);
PulsarTemporaryTopic res = new PulsarTemporaryTopic(name, session);
temporaryDestinations.add(res);
return res;
Expand Down Expand Up @@ -988,11 +988,19 @@ private ConnectionConsumer buildConnectionConsumer(
return connectionConsumer;
}

private void createNonPartitionedTopic(String name) {
private void createPulsarTemporaryTopic(String name) throws JMSException {
try {
factory.getPulsarAdmin().topics().createNonPartitionedTopic(name);
} catch (IllegalStateException err) {
if (!factory.isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Skipping creation of nonPartitionedTopic {} as jms.allowTemporaryTopicWithoutAdmin=true",
name,
err);
} catch (Exception err) {
log.warn("Skipping creation of nonPartitionedTopic {}", name, err);
throw Utils.handleException(err);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public class PulsarConnectionFactory
private transient SubscriptionType topicSharedSubscriptionType = SubscriptionType.Shared;
private transient long waitForServerStartupTimeout = 60000;
private transient boolean usePulsarAdmin = true;
private transient boolean allowTemporaryTopicWithoutAdmin = true;
private transient boolean precreateQueueSubscription = true;
private transient int precreateQueueSubscriptionConsumerQueueSize = 0;
private transient boolean initialized;
Expand Down Expand Up @@ -330,6 +331,11 @@ private synchronized void ensureInitialized(String connectUsername, String conne
this.usePulsarAdmin =
Boolean.parseBoolean(getAndRemoveString("jms.usePulsarAdmin", "true", configurationCopy));

this.allowTemporaryTopicWithoutAdmin =
Boolean.parseBoolean(
getAndRemoveString(
"jms.allowTemporaryTopicWithoutAdmin", "false", configurationCopy));

this.precreateQueueSubscription =
Boolean.parseBoolean(
getAndRemoveString("jms.precreateQueueSubscription", "true", configurationCopy));
Expand Down Expand Up @@ -1726,6 +1732,10 @@ public boolean isAcknowledgeRejectedMessages() {
return acknowledgeRejectedMessages;
}

public boolean isAllowTemporaryTopicWithoutAdmin() {
return allowTemporaryTopicWithoutAdmin;
}

public synchronized boolean isClosed() {
return closed;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.datastax.oss.pulsar.jms;

import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -48,8 +49,14 @@ public final void delete() throws JMSException {
PulsarAdmin pulsarAdmin;
try {
pulsarAdmin = session.getFactory().getPulsarAdmin();
} catch (Exception e) {
log.warn("Cannot delete a temporary destination {}", this, e);
} catch (IllegalStateException err) {
if (!session.getFactory().isAllowTemporaryTopicWithoutAdmin()) {
throw Utils.handleException(err);
}
log.warn(
"Cannot delete a temporary destination {}. Skipping because jms.allowTemporaryTopicWithoutAdmin=true",
this,
err);
return;
}
TopicStats stats = pulsarAdmin.topics().getStats(fullQualifiedTopicName);
Expand Down

0 comments on commit 4d27171

Please sign in to comment.