Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import datadog.trace.api.gateway.SubscriptionService;
import datadog.trace.api.git.EmbeddedGitInfoBuilder;
import datadog.trace.api.git.GitInfoProvider;
import datadog.trace.api.intake.Intake;
import datadog.trace.api.profiling.ProfilingEnablement;
import datadog.trace.api.scopemanager.ScopeListener;
import datadog.trace.bootstrap.benchmark.StaticEventLogger;
Expand Down Expand Up @@ -129,6 +130,7 @@ private enum AgentFeature {
CODE_ORIGIN(TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED, false),
DATA_JOBS(GeneralConfig.DATA_JOBS_ENABLED, false),
AGENTLESS_LOG_SUBMISSION(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED, false),
LOGS_CAPTURE(GeneralConfig.LOGS_CAPTURE_ENABLED, false),
LLMOBS(LlmObsConfig.LLMOBS_ENABLED, false),
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false),
FEATURE_FLAGGING(FeatureFlaggingConfig.FLAGGING_PROVIDER_ENABLED, false);
Expand Down Expand Up @@ -190,6 +192,7 @@ public boolean isEnabledByDefault() {
private static boolean codeOriginEnabled = false;
private static boolean distributedDebuggerEnabled = false;
private static boolean agentlessLogSubmissionEnabled = false;
private static boolean logsCaptureEnabled = false;
private static boolean featureFlaggingEnabled = false;

private static void safelySetContextClassLoader(ClassLoader classLoader) {
Expand Down Expand Up @@ -275,6 +278,7 @@ public static void start(
exceptionReplayEnabled = isFeatureEnabled(AgentFeature.EXCEPTION_REPLAY);
codeOriginEnabled = isFeatureEnabled(AgentFeature.CODE_ORIGIN);
agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION);
logsCaptureEnabled = isFeatureEnabled(AgentFeature.LOGS_CAPTURE);
llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS);
featureFlaggingEnabled = isFeatureEnabled(AgentFeature.FEATURE_FLAGGING);

Expand Down Expand Up @@ -1122,15 +1126,16 @@ private static void maybeStartFeatureFlagging(final Class<?> scoClass, final Obj
}

private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
if (agentlessLogSubmissionEnabled) {
if (agentlessLogSubmissionEnabled || logsCaptureEnabled) {
StaticEventLogger.begin("Logs Intake");

try {
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method logsIntakeInstallerMethod =
logsIntakeSystemClass.getMethod("install", scoClass);
logsIntakeInstallerMethod.invoke(null, sco);
logsIntakeSystemClass.getMethod("install", scoClass, Intake.class);
logsIntakeInstallerMethod.invoke(
null, sco, agentlessLogSubmissionEnabled ? Intake.LOGS : Intake.EVENT_PLATFORM);
} catch (final Throwable e) {
log.warn("Not installing Logs Intake subsystem", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void dispatch(List<Map<String, Object>> messages) {

private void flush(StringBuilder batch) {
try {
RequestBody requestBody = RequestBody.create(JSON, batch.toString());
String json = batch.toString();
RequestBody requestBody = RequestBody.create(JSON, json);
RequestBody gzippedRequestBody = OkHttpUtils.gzippedRequestBodyOf(requestBody);
backendApi.post("logs", gzippedRequestBody, IGNORE_RESPONSE, null, true);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datadog.communication.BackendApiFactory;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.api.intake.Intake;
import datadog.trace.api.logging.intake.LogsIntake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -11,15 +12,15 @@ public class LogsIntakeSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);

public static void install(SharedCommunicationObjects sco) {
public static void install(SharedCommunicationObjects sco, Intake intake) {
Config config = Config.get();
if (!config.isAgentlessLogSubmissionEnabled()) {
LOGGER.debug("Agentless logs intake is disabled");
if (!config.isAgentlessLogSubmissionEnabled() && !config.isLogsCaptureEnabled()) {
LOGGER.debug("Agentless logs intake and logs capture are disabled");
return;
}

BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory, intake);
sco.whenReady(writer::start);

LogsIntake.registerWriter(writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ public class LogsWriterImpl implements LogsWriter {

private final Map<String, Object> commonTags;
private final BackendApiFactory apiFactory;
private final Intake intake;
private final BlockingQueue<Map<String, Object>> messageQueue;
private final Thread messagePollingThread;

public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
public LogsWriterImpl(Config config, BackendApiFactory apiFactory, Intake intake) {
this.apiFactory = apiFactory;
this.intake = intake;

commonTags = new HashMap<>();
commonTags.put("ddsource", "java");
Expand Down Expand Up @@ -87,7 +89,7 @@ public void log(Map<String, Object> message) {
}

private void logPollingLoop() {
BackendApi backendApi = apiFactory.createBackendApi(Intake.LOGS);
BackendApi backendApi = apiFactory.createBackendApi(intake);
LogsDispatcher logsDispatcher = new LogsDispatcher(backendApi);

while (!Thread.currentThread().isInterrupted()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package datadog.trace.instrumentation.log4j2;

import datadog.trace.api.Config;
import datadog.trace.api.CorrelationIdentifier;
import datadog.trace.api.logging.intake.LogsIntake;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -13,8 +15,11 @@ public class DatadogAppender extends AbstractAppender {

private static final int MAX_STACKTRACE_STRING_LENGTH = 16 * 1_024;

public DatadogAppender(String name, Filter filter) {
private final boolean logsCaptureEnabled;

public DatadogAppender(String name, Filter filter, Config config) {
super(name, filter, null);
this.logsCaptureEnabled = config.isLogsCaptureEnabled();
}

@Override
Expand All @@ -40,7 +45,6 @@ private Map<String, Object> map(final LogEvent event) {
Map<String, Object> thrownLog = new HashMap<>();
thrownLog.put("message", thrown.getMessage());
thrownLog.put("name", thrown.getClass().getCanonicalName());

// TODO consider using structured stack trace layout
// (see
// org.apache.logging.log4j.layout.template.json.resolver.ExceptionResolver#createStackTraceResolver)
Expand All @@ -55,19 +59,29 @@ private Map<String, Object> map(final LogEvent event) {

log.put("thrown", thrownLog);
}

log.put("contextMap", event.getContextMap());
log.put("endOfBatch", event.isEndOfBatch());
log.put("loggerFqcn", event.getLoggerFqcn());

StackTraceElement source = event.getSource();
Map<String, Object> sourceLog = new HashMap<>();
sourceLog.put("class", source.getClassName());
sourceLog.put("method", source.getMethodName());
sourceLog.put("file", source.getFileName());
sourceLog.put("line", source.getLineNumber());
log.put("source", sourceLog);

if (logsCaptureEnabled) {
// skip log source for now as this is expensive
// will be later introduce with Log Origin and optimisations
String traceId = CorrelationIdentifier.getTraceId();
if (traceId != null && !traceId.equals("0")) {
log.put("dd.trace_id", traceId);
}
String spanId = CorrelationIdentifier.getSpanId();
if (spanId != null && !spanId.equals("0")) {
log.put("dd.span_id", spanId);
}
} else {
log.put("contextMap", event.getContextMap());
StackTraceElement source = event.getSource();
Map<String, Object> sourceLog = new HashMap<>();
sourceLog.put("class", source.getClassName());
sourceLog.put("method", source.getMethodName());
sourceLog.put("file", source.getFileName());
sourceLog.put("line", source.getLineNumber());
log.put("source", sourceLog);
}
return log;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public LoggerConfigInstrumentation() {

@Override
public boolean isApplicable(Set<TargetSystem> enabledSystems) {
return super.isApplicable(enabledSystems)
&& InstrumenterConfig.get().isAgentlessLogSubmissionEnabled();
return (super.isApplicable(enabledSystems)
&& InstrumenterConfig.get().isAgentlessLogSubmissionEnabled())
|| Config.get().isLogsCaptureEnabled();
}

@Override
Expand Down Expand Up @@ -62,10 +63,10 @@ public static void onExit(@Advice.This LoggerConfig loggerConfig) {
}
}

DatadogAppender appender = new DatadogAppender("datadog", null);
Config config = Config.get();
DatadogAppender appender = new DatadogAppender("datadog", null, config);
appender.start();

Config config = Config.get();
Level level = Level.valueOf(config.getAgentlessLogSubmissionLevel());
loggerConfig.addAppender(appender, level, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import datadog.trace.agent.test.InstrumentationSpecification
import datadog.trace.api.config.GeneralConfig
import datadog.trace.api.logging.intake.LogsIntake
import datadog.trace.api.logging.intake.LogsWriter
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.appender.AbstractAppender
import org.junit.jupiter.api.Assumptions
import spock.util.environment.Jvm

class Log4jDatadogAppenderLogCaptureTest extends InstrumentationSpecification {

private static DummyLogsWriter logsWriter

def setupSpec() {
logsWriter = new DummyLogsWriter()
LogsIntake.registerWriter(logsWriter)
}

@Override
void configurePreAgent() {
super.configurePreAgent()
injectSysConfig(GeneralConfig.LOGS_CAPTURE_ENABLED, "true")
}

def "test datadog appender registration"() {
setup:
ensureLog4jVersionCompatibleWithCurrentJVM()

def logger = LogManager.getLogger(Log4jDatadogAppenderLogCaptureTest)

when:
logger.error("A test message")

then:
!logsWriter.messages.empty

def message = logsWriter.messages.poll()
"A test message" == message.get("message")
"ERROR" == message.get("level")
"Log4jDatadogAppenderLogCaptureTest" == message.get("loggerName")
}

private static ensureLog4jVersionCompatibleWithCurrentJVM() {
try {
// init class to see if UnsupportedClassVersionError gets thrown
AbstractAppender.package
} catch (UnsupportedClassVersionError e) {
Assumptions.assumeTrue(false, "Latest Log4j2 release requires Java 17, current JVM: " + Jvm.current.javaVersion)
}
}

private static final class DummyLogsWriter implements LogsWriter {
private final Queue<Map<String, Object>> messages = new ArrayDeque<>()

@Override
void log(Map<String, Object> message) {
messages.offer(message)
}

@Override
void start() {
// no op
}

@Override
void shutdown() {
// no op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import java.util.Map;
import java.util.Set;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
public class LogbackLoggerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public LogbackLoggerInstrumentation() {
super("logback");
super("logback", "logs-intake", "logs-intake-logback");
}

@Override
Expand All @@ -43,6 +45,16 @@ public Map<String, String> contextStore() {
"ch.qos.logback.classic.spi.ILoggingEvent", AgentSpanContext.class.getName());
}

@Override
public boolean isApplicable(Set<TargetSystem> enabledSystems) {
return super.isApplicable(enabledSystems) || Config.get().isLogsCaptureEnabled();
}

@Override
public String[] helperClassNames() {
return new String[] {LogsIntakeHelper.class.getName()};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
Expand All @@ -52,6 +64,15 @@ public void methodAdvice(MethodTransformer transformer) {
.and(takesArguments(1))
.and(takesArgument(0, named("ch.qos.logback.classic.spi.ILoggingEvent"))),
LogbackLoggerInstrumentation.class.getName() + "$CallAppendersAdvice");
if (Config.get().isLogsCaptureEnabled()) {
transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("callAppenders"))
.and(takesArguments(1))
.and(takesArgument(0, named("ch.qos.logback.classic.spi.ILoggingEvent"))),
LogbackLoggerInstrumentation.class.getName() + "$CallAppendersAdvice2");
}
}

public static class CallAppendersAdvice {
Expand All @@ -65,4 +86,11 @@ public static void onEnter(@Advice.Argument(0) ILoggingEvent event) {
}
}
}

public static class CallAppendersAdvice2 {
@Advice.OnMethodEnter
public static void onEnter(@Advice.Argument(0) ILoggingEvent event) {
LogsIntakeHelper.log(event);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package datadog.trace.instrumentation.logback;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.StackTraceElementProxy;
import datadog.trace.api.CorrelationIdentifier;
import datadog.trace.api.logging.intake.LogsIntake;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public class LogsIntakeHelper {

public static void log(ILoggingEvent event) {
LogsIntake.log(map(event));
}

private static Map<String, Object> map(ILoggingEvent event) {
Map<String, Object> log = new HashMap<>();
log.put("thread", event.getThreadName());
log.put("level", event.getLevel().levelStr);
log.put("loggerName", event.getLoggerName());
log.put("message", event.getFormattedMessage());
if (event.getThrowableProxy() != null) {
Map<String, Object> thrownLog = new HashMap<>();
thrownLog.put("message", event.getThrowableProxy().getMessage());
thrownLog.put("name", event.getThrowableProxy().getClassName());
String stackTraceString =
Arrays.stream(event.getThrowableProxy().getStackTraceElementProxyArray())
.map(StackTraceElementProxy::getSTEAsString)
.collect(Collectors.joining(" "));
thrownLog.put("extendedStackTrace", stackTraceString);
log.put("thrown", thrownLog);
}
String traceId = CorrelationIdentifier.getTraceId();
if (traceId != null && !traceId.equals("0")) {
log.put("dd.trace_id", traceId);
}
String spanId = CorrelationIdentifier.getSpanId();
if (spanId != null && !spanId.equals("0")) {
log.put("dd.span_id", spanId);
}
return log;
}
}
Loading