Skip to content

Commit 298e8cf

Browse files
committed
Open Tracing implementation
1 parent 8abd104 commit 298e8cf

File tree

5 files changed

+242
-3
lines changed

5 files changed

+242
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.context;
19+
20+
import io.opentracing.Span;
21+
import io.opentracing.SpanContext;
22+
import io.opentracing.Tracer;
23+
import io.opentracing.propagation.Format;
24+
import io.opentracing.propagation.TextMap;
25+
import io.opentracing.util.GlobalTracer;
26+
import java.nio.charset.Charset;
27+
import java.util.HashMap;
28+
import java.util.Iterator;
29+
import java.util.Map;
30+
31+
/** Support for OpenTracing spans */
32+
public class OpenTracingContextPropagator implements ContextPropagator {
33+
34+
private static ThreadLocal<SpanContext> currentOpenTracingSpanContext = new ThreadLocal<>();
35+
36+
public static void setCurrentOpenTracingSpanContext(SpanContext ctx) {
37+
if (ctx != null) {
38+
currentOpenTracingSpanContext.set(ctx);
39+
}
40+
}
41+
42+
public static SpanContext getCurrentOpenTracingSpanContext() {
43+
return currentOpenTracingSpanContext.get();
44+
}
45+
46+
@Override
47+
public String getName() {
48+
return "OpenTracing";
49+
}
50+
51+
@Override
52+
public Map<String, byte[]> serializeContext(Object context) {
53+
Map<String, byte[]> serializedContext = new HashMap<>();
54+
Map<String, String> contextMap = (Map<String, String>) context;
55+
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
56+
serializedContext.put(entry.getKey(), entry.getValue().getBytes(Charset.defaultCharset()));
57+
}
58+
return serializedContext;
59+
}
60+
61+
@Override
62+
public Object deserializeContext(Map<String, byte[]> context) {
63+
Map<String, String> contextMap = new HashMap<>();
64+
for (Map.Entry<String, byte[]> entry : context.entrySet()) {
65+
contextMap.put(entry.getKey(), new String(entry.getValue(), Charset.defaultCharset()));
66+
}
67+
return contextMap;
68+
}
69+
70+
@Override
71+
public Object getCurrentContext() {
72+
Tracer currentTracer = GlobalTracer.get();
73+
Span currentSpan = currentTracer.scopeManager().activeSpan();
74+
if (currentSpan != null) {
75+
HashMapTextMap contextTextMap = new HashMapTextMap();
76+
currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap);
77+
return contextTextMap.getBackingMap();
78+
} else {
79+
return null;
80+
}
81+
}
82+
83+
@Override
84+
public void setCurrentContext(Object context) {
85+
Tracer currentTracer = GlobalTracer.get();
86+
Map<String, String> contextAsMap = (Map<String, String>) context;
87+
if (contextAsMap != null) {
88+
HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap);
89+
setCurrentOpenTracingSpanContext(
90+
currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap));
91+
}
92+
}
93+
94+
private class HashMapTextMap implements TextMap {
95+
96+
private final HashMap<String, String> backingMap = new HashMap<>();
97+
98+
public HashMapTextMap() {
99+
// Noop
100+
}
101+
102+
public HashMapTextMap(Map<String, String> spanData) {
103+
backingMap.putAll(spanData);
104+
}
105+
106+
@Override
107+
public Iterator<Map.Entry<String, String>> iterator() {
108+
return backingMap.entrySet().iterator();
109+
}
110+
111+
@Override
112+
public void put(String key, String value) {
113+
backingMap.put(key, value);
114+
}
115+
116+
public HashMap<String, String> getBackingMap() {
117+
return backingMap;
118+
}
119+
}
120+
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,19 @@
1919

2020
import com.google.common.util.concurrent.RateLimiter;
2121
import com.uber.cadence.context.ContextPropagator;
22+
import com.uber.cadence.context.OpenTracingContextPropagator;
2223
import com.uber.cadence.internal.context.ContextThreadLocal;
2324
import com.uber.cadence.internal.logging.LoggerTag;
2425
import com.uber.cadence.internal.metrics.MetricsType;
2526
import com.uber.cadence.internal.replay.DeciderCache;
2627
import com.uber.cadence.internal.replay.DecisionContext;
2728
import com.uber.cadence.workflow.Promise;
29+
import io.opentracing.Scope;
30+
import io.opentracing.Span;
31+
import io.opentracing.Tracer;
32+
import io.opentracing.log.Fields;
33+
import io.opentracing.tag.Tags;
34+
import io.opentracing.util.GlobalTracer;
2835
import java.io.PrintWriter;
2936
import java.io.StringWriter;
3037
import java.util.HashMap;
@@ -92,13 +99,26 @@ public void run() {
9299
ContextThreadLocal.setContextPropagators(this.contextPropagators);
93100
ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
94101

95-
try {
102+
// Set up an opentracing span
103+
Tracer openTracingTracer = GlobalTracer.get();
104+
Tracer.SpanBuilder builder =
105+
openTracingTracer
106+
.buildSpan("cadence.workflow")
107+
.withTag("resource.name", decisionContext.getWorkflowType().getName());
108+
109+
if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) {
110+
builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext());
111+
}
112+
Span span = builder.start();
113+
114+
try (Scope scope = openTracingTracer.activateSpan(span)) {
96115
// initialYield blocks thread until the first runUntilBlocked is called.
97116
// Otherwise r starts executing without control of the sync.
98117
threadContext.initialYield();
99118
cancellationScope.run();
100119
} catch (DestroyWorkflowThreadError e) {
101120
if (!threadContext.isDestroyRequested()) {
121+
setSpanError(span, e);
102122
threadContext.setUnhandledException(e);
103123
}
104124
} catch (Error e) {
@@ -111,9 +131,11 @@ public void run() {
111131
log.error(
112132
String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
113133
}
134+
setSpanError(span, e);
114135
threadContext.setUnhandledException(e);
115136
} catch (CancellationException e) {
116137
if (!isCancelRequested()) {
138+
setSpanError(span, e);
117139
threadContext.setUnhandledException(e);
118140
}
119141
if (log.isDebugEnabled()) {
@@ -130,14 +152,27 @@ public void run() {
130152
"Workflow thread \"%s\" run failed with unhandled exception:\n%s",
131153
name, stackTrace));
132154
}
155+
setSpanError(span, e);
133156
threadContext.setUnhandledException(e);
134157
} finally {
135158
DeterministicRunnerImpl.setCurrentThreadInternal(null);
136159
threadContext.setStatus(Status.DONE);
137160
thread.setName(originalName);
138161
thread = null;
139162
MDC.clear();
163+
span.finish();
164+
}
165+
}
166+
167+
private void setSpanError(Span span, Throwable ex) {
168+
Tags.ERROR.set(span, true);
169+
Map<String, Object> errorData = new HashMap<>();
170+
errorData.put(Fields.EVENT, "error");
171+
if (ex != null) {
172+
errorData.put(Fields.ERROR_OBJECT, ex);
173+
errorData.put(Fields.MESSAGE, ex.getMessage());
140174
}
175+
span.log(errorData);
141176
}
142177

143178
public String getName() {

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.uber.cadence.WorkflowExecution;
2929
import com.uber.cadence.common.RetryOptions;
3030
import com.uber.cadence.context.ContextPropagator;
31+
import com.uber.cadence.context.OpenTracingContextPropagator;
3132
import com.uber.cadence.internal.common.Retryer;
3233
import com.uber.cadence.internal.logging.LoggerTag;
3334
import com.uber.cadence.internal.metrics.MetricsTag;
@@ -38,6 +39,11 @@
3839
import com.uber.m3.tally.Stopwatch;
3940
import com.uber.m3.util.Duration;
4041
import com.uber.m3.util.ImmutableMap;
42+
import io.opentracing.Span;
43+
import io.opentracing.Tracer;
44+
import io.opentracing.log.Fields;
45+
import io.opentracing.tag.Tags;
46+
import io.opentracing.util.GlobalTracer;
4147
import java.nio.charset.StandardCharsets;
4248
import java.util.HashMap;
4349
import java.util.Map;
@@ -175,7 +181,18 @@ public void handle(PollForActivityTaskResponse task) throws Exception {
175181

176182
propagateContext(task);
177183

178-
try {
184+
// Set up an opentracing span
185+
Tracer openTracingTracer = GlobalTracer.get();
186+
Tracer.SpanBuilder builder =
187+
openTracingTracer
188+
.buildSpan("cadence.activity")
189+
.withTag("resource.name", task.getActivityType().getName());
190+
if (OpenTracingContextPropagator.getCurrentOpenTracingSpanContext() != null) {
191+
builder.asChildOf(OpenTracingContextPropagator.getCurrentOpenTracingSpanContext());
192+
}
193+
Span span = builder.start();
194+
195+
try (io.opentracing.Scope scope = openTracingTracer.activateSpan(span)) {
179196
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
180197
ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false);
181198
sw.stop();
@@ -197,11 +214,22 @@ public void handle(PollForActivityTaskResponse task) throws Exception {
197214
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
198215
sendReply(task, new Result(null, null, cancelledRequest, null), metricsScope);
199216
sw.stop();
217+
} catch (Exception e) {
218+
Tags.ERROR.set(span, true);
219+
Map<String, Object> errorData = new HashMap<>();
220+
errorData.put(Fields.EVENT, "error");
221+
if (e != null) {
222+
errorData.put(Fields.ERROR_OBJECT, e);
223+
errorData.put(Fields.MESSAGE, e.getMessage());
224+
}
225+
span.log(errorData);
226+
throw e;
200227
} finally {
201228
MDC.remove(LoggerTag.ACTIVITY_ID);
202229
MDC.remove(LoggerTag.ACTIVITY_TYPE);
203230
MDC.remove(LoggerTag.WORKFLOW_ID);
204231
MDC.remove(LoggerTag.RUN_ID);
232+
span.finish();
205233
}
206234
}
207235

src/main/java/com/uber/cadence/worker/Worker.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.cadence.client.WorkflowClient;
2626
import com.uber.cadence.common.WorkflowExecutionHistory;
2727
import com.uber.cadence.context.ContextPropagator;
28+
import com.uber.cadence.context.OpenTracingContextPropagator;
2829
import com.uber.cadence.converter.DataConverter;
2930
import com.uber.cadence.internal.common.InternalUtils;
3031
import com.uber.cadence.internal.metrics.MetricsTag;
@@ -952,10 +953,13 @@ private FactoryOptions(
952953
}
953954

954955
if (contextPropagators != null) {
955-
this.contextPropagators = contextPropagators;
956+
this.contextPropagators = new ArrayList(contextPropagators);
956957
} else {
957958
this.contextPropagators = new ArrayList<>();
958959
}
960+
961+
// Add the OpenTracing propagator
962+
this.contextPropagators.add(new OpenTracingContextPropagator());
959963
}
960964
}
961965
}

src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java

+52
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.uber.cadence.client.WorkflowStub;
4545
import com.uber.cadence.client.WorkflowTimedOutException;
4646
import com.uber.cadence.context.ContextPropagator;
47+
import com.uber.cadence.context.OpenTracingContextPropagator;
4748
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
4849
import com.uber.cadence.testing.SimulatedTimeoutException;
4950
import com.uber.cadence.testing.TestEnvironmentOptions;
@@ -57,8 +58,15 @@
5758
import com.uber.cadence.workflow.SignalMethod;
5859
import com.uber.cadence.workflow.Workflow;
5960
import com.uber.cadence.workflow.WorkflowMethod;
61+
import io.opentracing.Scope;
62+
import io.opentracing.Span;
63+
import io.opentracing.Tracer;
64+
import io.opentracing.mock.MockTracer;
65+
import io.opentracing.util.GlobalTracer;
66+
import io.opentracing.util.ThreadLocalScopeManager;
6067
import java.nio.charset.StandardCharsets;
6168
import java.time.Duration;
69+
import java.util.Arrays;
6270
import java.util.Collections;
6371
import java.util.List;
6472
import java.util.Map;
@@ -983,4 +991,48 @@ public void testDefaultChildWorkflowContextPropagation() {
983991
String result = workflow.workflow("input1");
984992
assertEquals("testing123testing123", result);
985993
}
994+
995+
public static class OpenTracingContextPropagationWorkflowImpl implements TestWorkflow {
996+
@Override
997+
public String workflow1(String input) {
998+
Tracer tracer = GlobalTracer.get();
999+
Span span = tracer.buildSpan("testContextPropagationWorkflow").start();
1000+
try (Scope scope = tracer.scopeManager().activate(span)) {
1001+
Span activeSpan = tracer.scopeManager().activeSpan();
1002+
return activeSpan.getBaggageItem("foo");
1003+
} finally {
1004+
span.finish();
1005+
}
1006+
}
1007+
}
1008+
1009+
@Test
1010+
public void testOpenTracingContextPropagation() {
1011+
MockTracer tracer =
1012+
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
1013+
GlobalTracer.registerIfAbsent(tracer);
1014+
Span span = tracer.buildSpan("testContextPropagation").start();
1015+
1016+
Worker worker = testEnvironment.newWorker(TASK_LIST);
1017+
worker.registerWorkflowImplementationTypes(OpenTracingContextPropagationWorkflowImpl.class);
1018+
testEnvironment.start();
1019+
WorkflowClient client = testEnvironment.newWorkflowClient();
1020+
WorkflowOptions options =
1021+
new WorkflowOptions.Builder()
1022+
.setContextPropagators(
1023+
Arrays.asList(new TestContextPropagator(), new OpenTracingContextPropagator()))
1024+
.build();
1025+
1026+
try (Scope scope = tracer.scopeManager().activate(span)) {
1027+
1028+
Span activeSpan = tracer.scopeManager().activeSpan();
1029+
activeSpan.setBaggageItem("foo", "bar");
1030+
1031+
TestWorkflow workflow = client.newWorkflowStub(TestWorkflow.class, options);
1032+
assertEquals("bar", workflow.workflow1("input1"));
1033+
1034+
} finally {
1035+
span.finish();
1036+
}
1037+
}
9861038
}

0 commit comments

Comments
 (0)