Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse 128 bit trace Id retuned by lambda extension #7620

Merged
merged 10 commits into from
Sep 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ public AgentSpan blackholeSpan() {

@Override
public AgentSpan.Context notifyExtensionStart(Object event) {
return LambdaHandler.notifyStartInvocation(event, propagationTagsFactory);
return LambdaHandler.notifyStartInvocation(this, event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datadog.trace.api.DDTraceId;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.core.CoreTracer;
import datadog.trace.core.propagation.ExtractedContext;
import datadog.trace.core.propagation.PropagationTags;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class LambdaHandler {
private static final String DATADOG_INVOCATION_ERROR_TYPE = "x-datadog-invocation-error-type";
private static final String DATADOG_INVOCATION_ERROR_STACK = "x-datadog-invocation-error-stack";
private static final String DATADOG_TAGS_KEY = "x-datadog-tags";
private static final String DATADOG_UPPER_64_TRACE_ID_TAG_KEY = "_dd.p.tid";

private static final String START_INVOCATION = "/lambda/start-invocation";
private static final String END_INVOCATION = "/lambda/end-invocation";
Expand Down Expand Up @@ -73,6 +75,35 @@ public class LambdaHandler {

private static String EXTENSION_BASE_URL = "http://127.0.0.1:8124";

public static AgentSpan.Context notifyStartInvocation(CoreTracer tracer, Object event) {
RequestBody body = RequestBody.create(jsonMediaType, writeValueAsString(event));
try (Response response =
HTTP_CLIENT
.newCall(
new Request.Builder()
.url(EXTENSION_BASE_URL + START_INVOCATION)
.addHeader(DATADOG_META_LANG, "java")
.post(body)
.build())
.execute()) {
if (response.isSuccessful()) {

return tracer
.propagate()
.extract(
response.headers(),
(carrier, classifier) -> {
for (String headerName : carrier.names()) {
classifier.accept(headerName, carrier.get(headerName));
}
});
}
} catch (Throwable ignored) {
log.error("could not reach the extension");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected to show an error to the customer in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this means the user is starting a Java Lambda function instrumented with dd-trace-js, but it doesn't have the Lambda Extension, which means it can't send metrics etc. back to Datadog. It should leave an error in the logs so we can diagnose.

}
return null;
}

public static AgentSpan.Context notifyStartInvocation(
Object event, PropagationTags.Factory propagationTagsFactory) {
RequestBody body = RequestBody.create(jsonMediaType, writeValueAsString(event));
Expand All @@ -86,9 +117,20 @@ public static AgentSpan.Context notifyStartInvocation(
.build())
.execute()) {
if (response.isSuccessful()) {
final String traceID = response.headers().get(DATADOG_TRACE_ID);
final String traceIDLower64Long = response.headers().get(DATADOG_TRACE_ID);
final String priority = response.headers().get(DATADOG_SAMPLING_PRIORITY);
if (null != traceID && null != priority) {
final String tags = response.headers().get(DATADOG_TAGS_KEY);
final String traceIDUpper64BitHex = findUpper64BitTraceId(tags);
DDTraceId traceId;
if (null != traceIDUpper64BitHex && null != traceIDLower64Long) {
long lower64Long = Long.parseUnsignedLong(traceIDLower64Long);
String lower64Hex = Long.toHexString(lower64Long);
String full128BitHex = traceIDUpper64BitHex + lower64Hex;
traceId = DDTraceId.fromHex(full128BitHex);
} else {
traceId = DDTraceId.from(traceIDLower64Long);
}
if (null != traceIDLower64Long && null != priority) {
int samplingPriority = PrioritySampling.UNSET;
try {
samplingPriority = Integer.parseInt(priority);
Expand All @@ -97,18 +139,12 @@ public static AgentSpan.Context notifyStartInvocation(
}
log.debug(
"notifyStartInvocation success, found traceID = {} and samplingPriority = {}",
traceID,
traceIDLower64Long,
samplingPriority);
PropagationTags propagationTags =
propagationTagsFactory.fromHeaderValue(
PropagationTags.HeaderType.DATADOG, response.headers().get(DATADOG_TAGS_KEY));
propagationTagsFactory.fromHeaderValue(PropagationTags.HeaderType.DATADOG, tags);
return new ExtractedContext(
DDTraceId.from(traceID),
DDSpanId.ZERO,
samplingPriority,
null,
propagationTags,
DATADOG);
traceId, DDSpanId.ZERO, samplingPriority, null, propagationTags, DATADOG);
} else {
log.debug(
"could not find traceID or sampling priority in notifyStartInvocation, not injecting the context");
Expand All @@ -120,6 +156,21 @@ public static AgentSpan.Context notifyStartInvocation(
return null;
}

private static String findUpper64BitTraceId(String tags)
throws NumberFormatException, IndexOutOfBoundsException {
if (tags == null) {
return null;
}
String[] tagPairs = tags.split(",");
for (String tagPair : tagPairs) {
String[] tag = tagPair.trim().split("=");
if (tag.length == 2 && tag[0].equals(DATADOG_UPPER_64_TRACE_ID_TAG_KEY)) {
return tag[1];
}
}
return null;
}

public static boolean notifyEndInvocation(AgentSpan span, Object result, boolean isError) {

if (null == span || null == span.getSamplingPriority()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import datadog.trace.api.Config
import datadog.trace.api.DDSpanId
import datadog.trace.api.DDTags
import datadog.trace.api.DDTraceId
import datadog.trace.core.propagation.PropagationTags
import datadog.trace.core.CoreTracer
import datadog.trace.core.test.DDCoreSpecification
import datadog.trace.core.DDSpan
import com.amazonaws.services.lambda.runtime.events.SQSEvent
Expand Down Expand Up @@ -32,6 +32,7 @@ class LambdaHandlerTest extends DDCoreSpecification {
given:
Config config = Mock(Config)
config.getxDatadogTagsMaxLength() >> 512
CoreTracer ct = CoreTracer.builder().config(config).build()

def server = httpServer {
handlers {
Expand All @@ -47,7 +48,7 @@ class LambdaHandlerTest extends DDCoreSpecification {
LambdaHandler.setExtensionBaseUrl(server.address.toString())

when:
def objTest = LambdaHandler.notifyStartInvocation(obj, PropagationTags.factory(config))
def objTest = LambdaHandler.notifyStartInvocation(ct, obj)

then:
objTest.getTraceId().toString() == traceId
Expand All @@ -61,10 +62,46 @@ class LambdaHandlerTest extends DDCoreSpecification {
"1234" | 2 | new TestObject()
}

def "test start invocation with 128 bit trace ID"() {
given:
Config config = Mock(Config)
config.getxDatadogTagsMaxLength() >> 512
CoreTracer ct = CoreTracer.builder().config(config).build()

def server = httpServer {
handlers {
post("/lambda/start-invocation") {
response
.status(200)
.addHeader("x-datadog-trace-id", "5744042798732701615")
.addHeader("x-datadog-sampling-priority", "2")
.addHeader("x-datadog-tags", "_dd.p.tid=1914fe7789eb32be")
.send()
}
}
}
LambdaHandler.setExtensionBaseUrl(server.address.toString())

when:
def objTest = LambdaHandler.notifyStartInvocation(ct, obj)

then:
objTest.getTraceId().toString() == traceId
objTest.getSamplingPriority() == samplingPriority

cleanup:
server.close()

where:
traceId | samplingPriority | obj
"33339707034668043638992915224308903855" | 2 | new TestObject()
}

def "test start invocation failure"() {
given:
Config config = Mock(Config)
config.getxDatadogTagsMaxLength() >> 512
CoreTracer ct = CoreTracer.builder().config(config).build()

def server = httpServer {
handlers {
Expand All @@ -78,7 +115,7 @@ class LambdaHandlerTest extends DDCoreSpecification {
LambdaHandler.setExtensionBaseUrl(server.address.toString())

when:
def objTest = LambdaHandler.notifyStartInvocation(obj, PropagationTags.factory(config))
def objTest = LambdaHandler.notifyStartInvocation(ct, obj)

then:
objTest == expected
Expand Down
Loading