Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,17 @@
#
# Sets the maximum nesting depth. The depth is a count of objects and arrays that have not
# been closed, `{` and `[` respectively.
#-Dlogstash.jackson.stream-read-constraints.max-nesting-depth=1000
#-Dlogstash.jackson.stream-read-constraints.max-nesting-depth=1000

# OTel
-javaagent:/Users/andrea/workspace/onweek_otel_tracing_ls/elastic-otel-javaagent-1.4.1.jar
#-Dotel.service.name=logstash-otel
#-Dotel.metrics.exporter=otlp
#-Dotel.logs.exporter=otlp
#-Dotel.traces.exporter=otlp
#-Dotel.resource.attributes=service.version=1.0,deployment.environment=production
#-Dotel.exporter.otlp.endpoint=
#-Dotel.exporter.otlp.headers=Authorization=Bearer%20

#-Dotel.javaagent.logging=application
#-Dotel.javaagent.debug=true
3 changes: 3 additions & 0 deletions logstash-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,7 @@ dependencies {
api group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.14'
api group: 'commons-codec', name: 'commons-codec', version: '1.17.0'
api group: 'org.apache.httpcomponents', name: 'httpcore', version: '4.4.16'

// OTel dependencies
implementation 'io.opentelemetry:opentelemetry-api:1.25.0'
}
18 changes: 18 additions & 0 deletions logstash-core/src/main/java/org/logstash/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import co.elastic.logstash.api.EventFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.opentelemetry.api.trace.Span;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -71,6 +72,7 @@ public final class Event implements Cloneable, Queueable, co.elastic.logstash.ap
private static final FieldReference TAGS_FAILURE_FIELD = FieldReference.from(TAGS_FAILURE);

private static final Logger logger = LogManager.getLogger(Event.class);
private transient Map<String, Span> contextualSpans = new HashMap<>();

public Event()
{
Expand Down Expand Up @@ -551,6 +553,22 @@ public static Event deserialize(byte[] data) throws IOException {
return fromSerializableMap(data);
}

public void associateSpan(Span span, String contextName) {
this.contextualSpans.put(contextName, span);
}

public void endSpan(String contextName) {
Span span = spanForContext(contextName);
if (span == null) {
return;
}
span.end();
}

public Span spanForContext(String contextName) {
return this.contextualSpans.get(contextName);
}

public static class InvalidTagsTypeException extends RuntimeException {
private static final long serialVersionUID = 1L;

Expand Down
22 changes: 22 additions & 0 deletions logstash-core/src/main/java/org/logstash/OTelUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.logstash;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;

public class OTelUtil {
// Get the global OpenTelemetry instance configured by the agent
public static final OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();

// Create a tracer for this class/service
public static final Tracer tracer = openTelemetry.getTracer("Logstash");
public static final String METADATA_OTEL_CONTEXT = "otel_context";
public static final String METADATA_OTEL_FULLCONTEXT = "otel_full_context";

public static Span newSpan(String name) {
return tracer.spanBuilder(name)
.setNoParent()
.startSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@

package org.logstash.config.ir;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.RubyArray;
import org.jruby.RubyHash;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Event;
import org.logstash.RubyUtil;
import org.logstash.Rubyfier;
import org.logstash.common.EnvironmentVariableProvider;
Expand Down Expand Up @@ -57,6 +63,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.logstash.OTelUtil.tracer;
import static org.logstash.config.ir.compiler.Utils.copyNonCancelledEvents;

/**
Expand All @@ -68,6 +75,8 @@
*/
public final class CompiledPipeline {

private static final Logger LOGGER = LogManager.getLogger(CompiledPipeline.class);

/**
* Compiler for conditional expressions that turn {@link IfVertex} into {@link EventCondition}.
*/
Expand Down Expand Up @@ -168,6 +177,7 @@ public CompiledPipeline.CompiledExecution buildExecution() {
* @return CompiledPipeline.CompiledExecution the compiled pipeline
*/
public CompiledPipeline.CompiledExecution buildExecution(boolean orderedExecution) {
LOGGER.info("Pipeline executes in {} mode", orderedExecution ? "ordered" : "unordered");
return orderedExecution
? new CompiledPipeline.CompiledOrderedExecution()
: new CompiledPipeline.CompiledUnorderedExecution();
Expand Down Expand Up @@ -320,25 +330,35 @@ public final class CompiledOrderedExecution extends CompiledExecution {

@Override
public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
return compute(batch.events(), flush, shutdown);
startWorkerSpans(batch.events());
int result = compute(batch.events(), flush, shutdown);
stopWorkerSpans(batch.events());
return result;
}

@SuppressWarnings({"unchecked"})
@Override
public int compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
if (!batch.isEmpty()) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
// send batch one-by-one as single-element batches down the filters
for (final RubyEvent e : batch) {
filterBatch.set(0, e);
startSpans(filterBatch, "pipeline.filters");
_compute(filterBatch, outputBatch, flush, shutdown);
stopSpans(filterBatch, "pipeline.filters");
}
startSpans(outputBatch, "pipeline.outputs");
compiledOutputs.compute(outputBatch, flush, shutdown);
stopSpans(outputBatch, "pipeline.outputs");
return outputBatch.size();
} else if (flush || shutdown) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
_compute(EMPTY_ARRAY, outputBatch, flush, shutdown);
startSpans(outputBatch, "pipeline.outputs");
compiledOutputs.compute(outputBatch, flush, shutdown);
stopSpans(outputBatch, "pipeline.outputs");
return outputBatch.size();
}
return 0;
Expand All @@ -351,21 +371,45 @@ private void _compute(final RubyArray<RubyEvent> batch, final RubyArray<RubyEven
}
}

private void stopWorkerSpans(Collection<RubyEvent> events) {
for (RubyEvent event : events) {
Event javaEvent = event.getEvent();
javaEvent.endSpan("worker");
}
}

private void startWorkerSpans(Collection<RubyEvent> events) {
for (RubyEvent event : events) {
Event javaEvent = event.getEvent();
Span span = tracer.spanBuilder("worker")
.startSpan();
javaEvent.associateSpan(span, "worker");
}
}

public final class CompiledUnorderedExecution extends CompiledExecution {

@Override
public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
return compute(batch.events(), flush, shutdown);
}

@SuppressWarnings({"unchecked"})
@Override
public int compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
startWorkerSpans(batch);

startSpans(batch, "pipeline.filters");
// we know for now this comes from batch.collection() which returns a LinkedHashSet
final Collection<RubyEvent> result = compiledFilters.compute(RubyArray.newArray(RubyUtil.RUBY, batch), flush, shutdown);
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray(result.size());
stopSpans(batch, "pipeline.filters");
final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray(result.size());
copyNonCancelledEvents(result, outputBatch);
compiledFilters.clear();
startSpans(outputBatch, "pipeline.outputs");
compiledOutputs.compute(outputBatch, flush, shutdown);
stopSpans(batch, "pipeline.outputs");
stopWorkerSpans(batch);
return outputBatch.size();
}
}
Expand Down Expand Up @@ -577,4 +621,25 @@ private Collection<Dataset> compileDependencies(
).collect(Collectors.toList());
}
}

private static void stopSpans(Collection<RubyEvent> batch, String pipelinePhase) {
for (RubyEvent event : batch) {
Event javaEvent = event.getEvent();
javaEvent.endSpan(pipelinePhase);
}
}

@SuppressWarnings("try")
private static void startSpans(Collection<RubyEvent> batch, String pipelinePhase) {
for (RubyEvent event : batch) {
Event javaEvent = event.getEvent();
Span workerSpan = javaEvent.spanForContext("worker");
try (Scope scope = workerSpan.makeCurrent()) {
Span span = tracer.spanBuilder(pipelinePhase)
.setParent(Context.current())
.startSpan();
javaEvent.associateSpan(span, pipelinePhase);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,8 @@

package org.logstash.execution;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.trace.Span;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
Expand Down Expand Up @@ -92,18 +67,45 @@
import org.logstash.instrument.metrics.MetricType;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.UpScaledMetric;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
import org.logstash.instrument.metrics.gauge.NumberGauge;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
import org.logstash.plugins.factory.PluginFactoryExt;
import org.logstash.plugins.factory.PluginMetricsFactoryExt;
import org.logstash.secret.store.SecretStore;
import org.logstash.secret.store.SecretStoreExt;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.logstash.OTelUtil.tracer;
import static org.logstash.instrument.metrics.MetricKeys.*;
import static org.logstash.instrument.metrics.UptimeMetric.ScaleUnits.MILLISECONDS;
import static org.logstash.instrument.metrics.UptimeMetric.ScaleUnits.SECONDS;
Expand Down Expand Up @@ -217,10 +219,16 @@ private void debugLogStackTrace(ConditionalEvaluationError err) {
}
}

@SuppressWarnings("try")
@JRubyMethod(required = 4)
public AbstractPipelineExt initialize(final ThreadContext context, final IRubyObject[] args)
throws IncompleteSourceWithMetadataException, NoSuchAlgorithmException {
initialize(context, args[0], args[1], args[2]);

Span span = tracer.spanBuilder("pipeline.initialize")
.setAttribute("pipeline.id", pipelineId().asJavaString())
.startSpan();

lirExecution = new CompiledPipeline(
lir,
new PluginFactoryExt(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init(
Expand All @@ -245,6 +253,9 @@ public AbstractPipelineExt initialize(final ThreadContext context, final IRubyOb
lir.getGraph().toString()
);
}

span.end();

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.OTelUtil;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.ext.JrubyEventExtLibrary;

/**
* Pipeline execution worker, it's responsible to execute filters and output plugins for each {@link QueueBatch} that
Expand Down Expand Up @@ -111,6 +116,9 @@ private boolean abortableCompute(QueueBatch batch, boolean flush, boolean shutdo
boolean isNackBatch = false;
try {
execution.compute(batch, flush, shutdown);
for (JrubyEventExtLibrary.RubyEvent e : batch.events()) {
e.getEvent().endSpan("global");
}
} catch (Exception ex) {
if (ex instanceof AbortedBatchException) {
isNackBatch = true;
Expand Down
Loading