Skip to content

Commit 5cd5265

Browse files
authored
Support for ActiveMQ-Artemis messaging tracing. (apache#670)
1 parent 822a73b commit 5cd5265

File tree

27 files changed

+1397
-0
lines changed

27 files changed

+1397
-0
lines changed

.github/workflows/plugins-jdk17-test.1.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ jobs:
5959
- resteasy-6.x-scenario
6060
- gateway-4.x-scenario
6161
- httpexchange-scenario
62+
- activemq-artemis-2.x-scenario
6263
steps:
6364
- uses: actions/checkout@v2
6465
with:

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Release Notes.
1414
* Support tracing for async producing, batch sync consuming, and batch async consuming in rocketMQ-client-java-5.x-plugin.
1515
* Convert the Redisson span into an async span.
1616
* Rename system env name from `sw_plugin_kafka_producer_config` to `SW_PLUGIN_KAFKA_PRODUCER_CONFIG`.
17+
* Support for ActiveMQ-Artemis messaging tracing.
1718

1819
#### Documentation
1920

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
~
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<parent>
22+
<artifactId>apm-sdk-plugin</artifactId>
23+
<groupId>org.apache.skywalking</groupId>
24+
<version>9.2.0-SNAPSHOT</version>
25+
</parent>
26+
<modelVersion>4.0.0</modelVersion>
27+
28+
<artifactId>apm-activemq-artemis-jakarta-client-2.x-plugin</artifactId>
29+
<name>activemq-artemis-jakarta-client-2.x-plugin</name>
30+
<packaging>jar</packaging>
31+
32+
<properties>
33+
<java.version>1.8</java.version>
34+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
35+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
36+
<artemis-jakarta-client.version>2.31.2</artemis-jakarta-client.version>
37+
</properties>
38+
<dependencies>
39+
<dependency>
40+
<groupId>org.apache.activemq</groupId>
41+
<artifactId>artemis-jakarta-client</artifactId>
42+
<version>${artemis-jakarta-client.version}</version>
43+
<scope>provided</scope>
44+
<exclusions>
45+
</exclusions>
46+
</dependency>
47+
</dependencies>
48+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;
19+
20+
import java.util.Map;
21+
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
22+
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
25+
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;
26+
27+
/**
28+
* {@link MessageConsumerConstructorInterceptor} get enhance data from the constructor of {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer}
29+
*/
30+
public class MessageConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
31+
private static final String DEFAULT_HOST = "localhost";
32+
private static final String DEFAULT_PORT = "61616";
33+
private static final String HOST_KEY = "host";
34+
private static final String PORT_KEY = "port";
35+
36+
@Override
37+
public void onConstruct(final EnhancedInstance objInst, final Object[] allArguments) throws Throwable {
38+
ActiveMQConnection connection = (ActiveMQConnection) allArguments[1];
39+
ActiveMQDestination destination = (ActiveMQDestination) allArguments[5];
40+
Map<String, Object> paramMap = connection.getSessionFactory().getConnectorConfiguration().getParams();
41+
EnhanceInfo enhanceInfo = new EnhanceInfo();
42+
enhanceInfo.setBrokerUrl(paramMap.getOrDefault(HOST_KEY, DEFAULT_HOST) + ":" + paramMap.getOrDefault(PORT_KEY
43+
, DEFAULT_PORT));
44+
enhanceInfo.setName(destination.getName());
45+
enhanceInfo.setAddress(destination.getAddress());
46+
enhanceInfo.setType(destination.getType());
47+
objInst.setSkyWalkingDynamicField(enhanceInfo);
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;
19+
20+
import java.lang.reflect.Method;
21+
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
22+
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
23+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
24+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
25+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
26+
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
27+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
28+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
29+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
30+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
31+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
33+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
34+
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;
35+
36+
/**
37+
* {@link MessageConsumerInterceptor} create entry span when the method {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageConsumer#getMessage(long, boolean)} execute
38+
*/
39+
public class MessageConsumerInterceptor implements InstanceMethodsAroundInterceptor {
40+
private static final String OPERATION_NAME_PREFIX = "ActiveMQ/";
41+
private static final String CONSUMER_OPERATION_NAME_SUFFIX = "/Consumer";
42+
public static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
43+
private static final String QUEUE = "Queue";
44+
private static final String TOPIC = "Topic";
45+
46+
@Override
47+
public void beforeMethod(final EnhancedInstance objInst,
48+
final Method method,
49+
final Object[] allArguments,
50+
final Class<?>[] classes,
51+
final MethodInterceptResult methodInterceptResult) throws Throwable {
52+
}
53+
54+
@Override
55+
public Object afterMethod(final EnhancedInstance objInst,
56+
final Method method,
57+
final Object[] objects,
58+
final Class<?>[] classes,
59+
final Object ret) throws Throwable {
60+
ActiveMQMessage message = (ActiveMQMessage) ret;
61+
if (message == null) {
62+
return ret;
63+
}
64+
ContextCarrier contextCarrier = getContextCarrierFromMessage(message);
65+
EnhanceInfo enhanceInfo = (EnhanceInfo) objInst.getSkyWalkingDynamicField();
66+
boolean queue = isQueue(enhanceInfo.getType());
67+
AbstractSpan activeSpan = ContextManager.createEntrySpan(
68+
buildOperationName(queue, enhanceInfo.getName()),
69+
contextCarrier
70+
);
71+
Tags.MQ_BROKER.set(activeSpan, enhanceInfo.getBrokerUrl());
72+
if (queue) {
73+
Tags.MQ_QUEUE.set(activeSpan, enhanceInfo.getName());
74+
} else {
75+
Tags.MQ_TOPIC.set(activeSpan, enhanceInfo.getName());
76+
}
77+
activeSpan.tag(MQ_MESSAGE_ID, message.getJMSMessageID());
78+
activeSpan.setPeer(enhanceInfo.getBrokerUrl());
79+
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_CONSUMER);
80+
SpanLayer.asMQ(activeSpan);
81+
ContextManager.stopSpan(activeSpan);
82+
return ret;
83+
}
84+
85+
@Override
86+
public void handleMethodException(final EnhancedInstance enhancedInstance,
87+
final Method method,
88+
final Object[] objects,
89+
final Class<?>[] classes,
90+
final Throwable t) {
91+
ContextManager.activeSpan().log(t);
92+
}
93+
94+
private ContextCarrier getContextCarrierFromMessage(ActiveMQMessage message) {
95+
ContextCarrier contextCarrier = new ContextCarrier();
96+
97+
CarrierItem next = contextCarrier.items();
98+
while (next.hasNext()) {
99+
next = next.next();
100+
next.setHeadValue(message.getCoreMessage().getStringProperty(next.getHeadKey().replace("-", "_")));
101+
}
102+
103+
return contextCarrier;
104+
}
105+
106+
private boolean isQueue(ActiveMQDestination.TYPE type) {
107+
return ActiveMQDestination.TYPE.QUEUE.equals(type) || ActiveMQDestination.TYPE.TEMP_QUEUE.equals(type);
108+
}
109+
110+
private String buildOperationName(boolean isQueue, String name) {
111+
if (isQueue) {
112+
return OPERATION_NAME_PREFIX + QUEUE + "/" + name + CONSUMER_OPERATION_NAME_SUFFIX;
113+
} else {
114+
return OPERATION_NAME_PREFIX + TOPIC + "/" + name + CONSUMER_OPERATION_NAME_SUFFIX;
115+
}
116+
}
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;
19+
20+
import java.util.Map;
21+
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
22+
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
25+
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;
26+
27+
/**
28+
* {@link MessageProducerConstructorInterceptor} get enhance data from the constructor of {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer}
29+
*/
30+
public class MessageProducerConstructorInterceptor implements InstanceConstructorInterceptor {
31+
private static final String DEFAULT_HOST = "localhost";
32+
private static final String DEFAULT_PORT = "61616";
33+
private static final String HOST_KEY = "host";
34+
private static final String PORT_KEY = "port";
35+
36+
@Override
37+
public void onConstruct(final EnhancedInstance objInst, final Object[] allArguments) throws Throwable {
38+
ActiveMQConnection connection = (ActiveMQConnection) allArguments[0];
39+
Map<String, Object> paramMap = connection.getSessionFactory().getConnectorConfiguration().getParams();
40+
ActiveMQDestination destination = (ActiveMQDestination) allArguments[2];
41+
EnhanceInfo enhanceInfo = new EnhanceInfo();
42+
enhanceInfo.setBrokerUrl(paramMap.getOrDefault(HOST_KEY, DEFAULT_HOST) + ":" + paramMap.getOrDefault(PORT_KEY
43+
, DEFAULT_PORT));
44+
enhanceInfo.setName(destination.getName());
45+
enhanceInfo.setAddress(destination.getAddress());
46+
enhanceInfo.setType(destination.getType());
47+
objInst.setSkyWalkingDynamicField(enhanceInfo);
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client;
19+
20+
import jakarta.jms.Message;
21+
import java.lang.reflect.Method;
22+
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
23+
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
24+
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
25+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
26+
import org.apache.skywalking.apm.agent.core.context.tag.StringTag;
27+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
28+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
29+
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
30+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
31+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
33+
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
34+
import org.apache.skywalking.apm.plugin.activemq.artemis.jakarta.client.define.EnhanceInfo;
35+
36+
/**
37+
* {@link MessageProducerInterceptor} create exit span when the method {@link org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer#doSendx}
38+
* execute
39+
*/
40+
public class MessageProducerInterceptor implements InstanceMethodsAroundInterceptor {
41+
private static final String OPERATION_NAME_PREFIX = "ActiveMQ/";
42+
private static final String PRODUCER_OPERATION_NAME_SUFFIX = "/Producer";
43+
private static final StringTag MQ_MESSAGE_ID = new StringTag("mq.message.id");
44+
private static final String QUEUE = "Queue";
45+
private static final String TOPIC = "Topic";
46+
47+
@Override
48+
public void beforeMethod(final EnhancedInstance objInst,
49+
final Method method,
50+
final Object[] allArguments,
51+
final Class<?>[] classes,
52+
final MethodInterceptResult methodInterceptResult) throws Throwable {
53+
ContextCarrier contextCarrier = new ContextCarrier();
54+
Message message = (Message) allArguments[1];
55+
EnhanceInfo enhanceInfo = (EnhanceInfo) objInst.getSkyWalkingDynamicField();
56+
boolean queue = isQueue(enhanceInfo.getType());
57+
AbstractSpan activeSpan = ContextManager.createExitSpan(
58+
buildOperationName(queue, enhanceInfo.getName()),
59+
contextCarrier, enhanceInfo.getBrokerUrl()
60+
);
61+
contextCarrier.extensionInjector().injectSendingTimestamp();
62+
Tags.MQ_BROKER.set(activeSpan, enhanceInfo.getBrokerUrl());
63+
if (queue) {
64+
Tags.MQ_QUEUE.set(activeSpan, enhanceInfo.getName());
65+
} else {
66+
Tags.MQ_TOPIC.set(activeSpan, enhanceInfo.getName());
67+
}
68+
SpanLayer.asMQ(activeSpan);
69+
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_PRODUCER);
70+
CarrierItem next = contextCarrier.items();
71+
72+
while (next.hasNext()) {
73+
next = next.next();
74+
message.setStringProperty(next.getHeadKey().replace("-", "_"), next.getHeadValue());
75+
}
76+
}
77+
78+
@Override
79+
public Object afterMethod(final EnhancedInstance enhancedInstance,
80+
final Method method,
81+
final Object[] allArguments,
82+
final Class<?>[] classes,
83+
final Object ret) throws Throwable {
84+
AbstractSpan activeSpan = ContextManager.activeSpan();
85+
Message message = (Message) allArguments[1];
86+
activeSpan.tag(MQ_MESSAGE_ID, message.getJMSMessageID());
87+
ContextManager.stopSpan();
88+
return ret;
89+
}
90+
91+
@Override
92+
public void handleMethodException(final EnhancedInstance enhancedInstance,
93+
final Method method,
94+
final Object[] objects,
95+
final Class<?>[] classes,
96+
final Throwable t) {
97+
ContextManager.activeSpan().log(t);
98+
}
99+
100+
private boolean isQueue(ActiveMQDestination.TYPE type) {
101+
return ActiveMQDestination.TYPE.QUEUE.equals(type) || ActiveMQDestination.TYPE.TEMP_QUEUE.equals(type);
102+
}
103+
104+
private String buildOperationName(boolean isQueue, String name) {
105+
if (isQueue) {
106+
return OPERATION_NAME_PREFIX + QUEUE + "/" + name + PRODUCER_OPERATION_NAME_SUFFIX;
107+
} else {
108+
return OPERATION_NAME_PREFIX + TOPIC + "/" + name + PRODUCER_OPERATION_NAME_SUFFIX;
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)