Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for spring-pulsar 1.0 #13320

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,9 @@ targets:
- type: gradle
path: ./
target: ':instrumentation:spring:spring-kafka-2.7:library'
- type: gradle
path: ./
target: ':instrumentation:spring:spring-pulsar-1.0:javaagent'
- type: gradle
path: ./
target: ':instrumentation:spring:spring-rabbit-1.0:javaagent'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Comparing source compatibility of opentelemetry-instrumentation-annotations-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-annotations-2.12.0.jar
Comparing source compatibility of opentelemetry-instrumentation-annotations-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-annotations-2.13.0.jar
No changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
Comparing source compatibility of opentelemetry-instrumentation-api-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-api-2.12.0.jar
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.instrumentation.api.semconv.util.SpanNames (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW METHOD: PUBLIC(+) STATIC(+) java.lang.String fromMethod(java.lang.reflect.Method)
+++ NEW METHOD: PUBLIC(+) STATIC(+) java.lang.String fromMethod(java.lang.Class<?>, java.lang.String)
Comparing source compatibility of opentelemetry-instrumentation-api-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-api-2.13.0.jar
No changes.
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Comparing source compatibility of opentelemetry-spring-boot-autoconfigure-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-autoconfigure-2.12.0.jar
Comparing source compatibility of opentelemetry-spring-boot-autoconfigure-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-autoconfigure-2.13.0.jar
No changes.
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Comparing source compatibility of opentelemetry-spring-boot-starter-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-starter-2.12.0.jar
Comparing source compatibility of opentelemetry-spring-boot-starter-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-starter-2.13.0.jar
No changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.springframework.pulsar")
module.set("spring-pulsar")
versions.set("[1.2.0,]")
Copy link
Contributor

@laurit laurit Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add assertInverse.set(true) This verifies that instrumentation does not apply to earlier versions.

}
}

dependencies {
library("org.springframework.pulsar:spring-pulsar:1.2.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

module is named 1.0 but tests 1.2.0


testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
testImplementation("org.testcontainers:pulsar")

testLibrary("org.springframework.pulsar:spring-pulsar:1.2.0")
testLibrary("org.springframework.boot:spring-boot-starter-test:3.2.4")
testLibrary("org.springframework.boot:spring-boot-starter:3.2.4")
}

val latestDepTest = findProperty("testLatestDeps") as Boolean

// spring 6 (spring boot 3) requires java 17
if (latestDepTest) {
otelJava {
minJavaVersionSupported.set(JavaVersion.VERSION_17)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import static io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0.SpringPulsarSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;

public class DefaultPulsarMessageListenerContainerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named(
"org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("dispatchMessageToListener")
.and(takesArguments(3))
.and(takesArgument(0, named("org.apache.pulsar.client.api.Message"))),
getClass().getName() + "$DispatchMessageToListenerAdvice");
}

@SuppressWarnings("unused")
public static class DispatchMessageToListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Message<?> message,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
Context parentContext = Java8BytecodeBridge.currentContext();
if (instrumenter().shouldStart(parentContext, message)) {
context = instrumenter().start(parentContext, message);
scope = context.makeCurrent();
}
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExit(
@Advice.Argument(0) Message<?> message,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Thrown Throwable throwable) {
if (scope == null) {
return;
}
scope.close();
instrumenter().end(context, message, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import io.opentelemetry.context.propagation.TextMapGetter;
import javax.annotation.Nullable;
import org.apache.pulsar.client.api.Message;

enum MessageHeaderGetter implements TextMapGetter<Message<?>> {
INSTANCE;

@Override
public Iterable<String> keys(Message<?> carrier) {
return carrier.getProperties().keySet();
}

@Nullable
@Override
public String get(@Nullable Message<?> carrier, String key) {
return carrier == null ? null : carrier.getProperties().get(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class SpringPulsarInstrumentationModule extends InstrumentationModule {
public SpringPulsarInstrumentationModule() {
super("spring-pulsar", "spring-pulsar-1.0");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new DefaultPulsarMessageListenerContainerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pulsar.client.api.Message;

enum SpringPulsarMessageAttributesGetter implements MessagingAttributesGetter<Message<?>, Void> {
INSTANCE;

@Override
public String getSystem(Message<?> message) {
return "pulsar";
}

@Override
@Nullable
public String getDestination(Message<?> message) {
return message.getTopicName();
}

@Nullable
@Override
public String getDestinationTemplate(Message<?> message) {
return null;
}

@Override
public boolean isTemporaryDestination(Message<?> message) {
return false;
}

@Override
public boolean isAnonymousDestination(Message<?> message) {
return false;
}

@Override
@Nullable
public String getConversationId(Message<?> message) {
return null;
}

@Override
public Long getMessageBodySize(Message<?> message) {
return (long) message.size();
}

@Nullable
@Override
public Long getMessageEnvelopeSize(Message<?> message) {
return null;
}

@Override
@Nullable
public String getMessageId(Message<?> message, @Nullable Void unused) {
if (message.getMessageId() != null) {
return message.getMessageId().toString();
}

return null;
}

@Nullable
@Override
public String getClientId(Message<?> message) {
return null;
}

@Nullable
@Override
public Long getBatchMessageCount(Message<?> message, @Nullable Void unused) {
return null;
}

@Override
public List<String> getMessageHeader(Message<?> message, String name) {
String value = message.getProperty(name);
return value != null ? singletonList(value) : emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import org.apache.pulsar.client.api.Message;

public final class SpringPulsarSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-pulsar-1.0";
private static final Instrumenter<Message<?>, Void> INSTRUMENTER;

static {
SpringPulsarMessageAttributesGetter getter = SpringPulsarMessageAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;

INSTRUMENTER =
Instrumenter.<Message<?>, Void>builder(
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build())
.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
}

public static Instrumenter<Message<?>, Void> instrumenter() {
return INSTRUMENTER;
}

private SpringPulsarSingletons() {}
}
Loading
Loading