Skip to content

Commit d03219d

Browse files
ericfirthcswattrtrieu
authored
DSM Manual Instrumentation documentation changes (#31906)
* Manual instrumentation changes * Apply suggestions from code review Co-authored-by: cecilia saixue watt <[email protected]> Co-authored-by: Rosa Trieu <[email protected]> * Add ruby docs * Ruby docs --------- Co-authored-by: cecilia saixue watt <[email protected]> Co-authored-by: Rosa Trieu <[email protected]>
1 parent 61d449d commit d03219d

File tree

6 files changed

+366
-73
lines changed

6 files changed

+366
-73
lines changed

content/en/data_streams/_index.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,16 @@ Data Streams Monitoring provides a standardized method for teams to understand a
5151

5252
Data Streams Monitoring instruments Kafka _clients_ (consumers/producers). If you can instrument your client infrastructure, you can use Data Streams Monitoring.
5353

54-
| | Java | Python | .NET | Node.js | Go |
55-
| - | ---- | ------ | ---- | ------- | -- |
56-
| Apache Kafka <br/>(self-hosted, Amazon MSK, Confluent Cloud, or any other hosting platform) | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} |
57-
| Amazon Kinesis | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | |
58-
| Amazon SNS | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | |
59-
| Amazon SQS | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | |
60-
| Azure Service Bus | | | {{< X >}} | | |
61-
| Google Pub/Sub | {{< X >}} | | | {{< X >}} | |
62-
| IBM MQ | | | {{< X >}} | | |
63-
| RabbitMQ | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | |
54+
| | Java | Python | .NET | Node.js | Go | Ruby |
55+
| - | ---- | ------ | ---- | ------- | -- | ---- |
56+
| Apache Kafka <br/>(self-hosted, Amazon MSK, Confluent Cloud, or any other hosting platform) | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} |
57+
| Amazon Kinesis | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | | |
58+
| Amazon SNS | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | | |
59+
| Amazon SQS | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | | |
60+
| Azure Service Bus | | | {{< X >}} | | | |
61+
| Google Pub/Sub | {{< X >}} | | | {{< X >}} | | |
62+
| IBM MQ | | | {{< X >}} | | | |
63+
| RabbitMQ | {{< X >}} | {{< X >}} | {{< X >}} | {{< X >}} | | |
6464

6565
Data Streams Monitoring requires minimum Datadog tracer versions. See each setup page for details.
6666

content/en/data_streams/manual_instrumentation.md

Lines changed: 258 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,89 +12,290 @@ further_reading:
1212
text: 'Software Catalog'
1313
---
1414

15-
Data Streams Monitoring (DSM) propagates context through message headers. Use manual instrumentation to set up DSM if you are using:
16-
- a message queue technology that is not supported by DSM
17-
- a message queue technology without headers, such as Kinesis
18-
- Lambdas in .NET or Java
15+
Data Streams Monitoring (DSM) tracks how data flows through queues and services. If your message system is **not automatically supported** (for example, your queue technology and language is not instrumented or the library you use in the language isn't automatically instrumented), you must **manually record checkpoints** so DSM can connect producers and consumers.
1916

20-
### Manual instrumentation installation
17+
- **Produce checkpoint**: records when a message is published, injects DSM context into the message.
18+
- **Consume checkpoint**: records when a message is received, extracting the DSM context if it exists, and prepares future produce checkpoints to carry that context forward.
2119

22-
1. Ensure you're using the [Datadog Agent v7.34.0 or later][1].
20+
**The DSM context must travel _with the message_**. If your system supports headers, store it there. Otherwise, embed it directly in the payload.
21+
22+
### Manual instrumentation installation
23+
Ensure you're using the [Datadog Agent v7.34.0 or later][1].
2324

24-
2. On services sending or consuming messages, declare the supported types. For example:
25-
{{< code-block lang="text" >}}
26-
kinesis, kafka, rabbitmq, sqs, sns, servicebus
27-
{{< /code-block >}}
2825

29-
3. Call the Data Streams Monitoring checkpoints when messages are produced and when they are consumed, as shown in the example code below:
3026
{{< tabs >}}
3127
{{% tab "Java" %}}
28+
## API reference
29+
30+
### `DataStreamsCheckpointer.get().setProduceCheckpoint(queueType, name, carrier)`
31+
- **queueType**: message system (for example `kafka`, `rabbitmq`, `sqs`, `sns`, `kinesis`, `servicebus`). Recognized strings surface system-specific DSM metrics; other strings are allowed.
32+
- **name**: queue, topic, or subscription name.
33+
- **carrier**: an implementation of `DataStreamsContextCarrier`. This is where DSM context is **stored** with the message (typically a headers map, but could be payload fields if no headers exist).
34+
35+
### `DataStreamsCheckpointer.get().setConsumeCheckpoint(queueType, name, carrier)`
36+
- **queueType**: same as producer.
37+
- **name**: same as producer.
38+
- **carrier**: an implementation of `DataStreamsContextCarrier`. This is where DSM context is **retrieved** from the message.
39+
40+
- **Note**: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
41+
42+
---
43+
44+
## Examples in context (single block)
45+
3246
{{< code-block lang="java" >}}
3347
import datadog.trace.api.experimental.*;
48+
import java.util.*;
49+
50+
// ==========================
51+
// producer-service.java
52+
// ==========================
53+
public class ProducerService {
54+
private final Channel channel; // your MQ/Kafka/etc. client
55+
56+
public ProducerService(Channel channel) {
57+
this.channel = channel;
58+
}
59+
60+
public void publishOrder(Order order) {
61+
Headers headers = new Headers(); // your header structure
62+
Carrier headersAdapter = new Carrier(headers);
63+
64+
// Mark DSM produce checkpoint right before sending the message.
65+
DataStreamsCheckpointer.get().setProduceCheckpoint(
66+
"kafka", // queueType
67+
"orders", // name
68+
headersAdapter
69+
);
70+
71+
// Send the message with DSM context attached.
72+
String payload = serialize(order);
73+
channel.send("orders", payload, headers);
74+
}
75+
}
3476

35-
​Carrier headersAdapter = new Carrier(headers);
36-
37-
// before calling produce
38-
DataStreamsCheckpointer.get().setProduceCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
39-
40-
// after calling consume
41-
DataStreamsCheckpointer.get().setConsumeCheckpoint("<datastream-type>", "<queue-or-topic-name>", headersAdapter);
42-
43-
// example: logging a kafka consume checkpoint on the 'customer-checkout' topic would look like
44-
DataStreamsCheckpointer.get().setConsumeCheckpoint("kafka", "customer-checkout", headersAdapter);
45-
46-
// replace headers with whatever you're using to pass the context
77+
// ==========================
78+
// consumer-service.java
79+
// ==========================
80+
public class ConsumerService {
81+
public void handleMessage(String payload, Headers headers) {
82+
Carrier headersAdapter = new Carrier(headers);
83+
84+
// Mark DSM consume checkpoint when receiving the message.
85+
DataStreamsCheckpointer.get().setConsumeCheckpoint(
86+
"kafka", // queueType (match producer)
87+
"orders", // name (match producer)
88+
headersAdapter
89+
);
90+
91+
// Continue with your application logic.
92+
Order order = deserialize(payload);
93+
processOrder(order);
94+
}
95+
}
96+
97+
// ==========================
98+
// carrier implementation
99+
// ==========================
47100
private class Carrier implements DataStreamsContextCarrier {
48-
private Headers headers;
49-
50-
public Carrier(Headers headers) {
51-
this.headers = headers;
52-
}
53-
54-
public Set<Entry<String, Object>> entries() {
55-
return this.headers.entrySet();
56-
}
57-
58-
public void set(String key, String value){
59-
this.headers.put(key, value);
60-
}
101+
private Headers headers;
102+
103+
public Carrier(Headers headers) {
104+
this.headers = headers;
105+
}
106+
107+
@Override
108+
public Set<Map.Entry<String, Object>> entries() {
109+
return this.headers.entrySet();
110+
}
111+
112+
@Override
113+
public void set(String key, String value) {
114+
this.headers.put(key, value);
115+
}
61116
}
62117
{{< /code-block >}}
63118
{{% /tab %}}
64119
{{% tab "Node.js" %}}
65-
{{< code-block lang="javascript" >}}
66-
const tracer = require('dd-trace').init({})
120+
## API reference
121+
122+
### `tracer.dataStreamsCheckpointer.setProduceCheckpoint(queueType, name, carrier)`
123+
- **queueType**: message system (for example `rabbitmq`, `kafka`, `sqs`, `sns`, `kinesis`, `servicebus`). Recognized strings surface system-specific DSM metrics; other strings are allowed.
124+
- **name**: queue, topic, or subscription name.
125+
- **carrier**: writeable key/value container to **store** DSM context with the message (headers object if supported; otherwise add fields to the payload).
126+
127+
### `tracer.dataStreamsCheckpointer.setConsumeCheckpoint(queueType, name, carrier)`
128+
- **queueType**: same value used by the producer (recognized strings preferred, other strings allowed).
129+
- **name**: same queue, topic, or subscription name.
130+
- **carrier**: readable key/value container to **retrieve** DSM context from the message (headers object if supported; otherwise the parsed payload).
131+
132+
**Note**: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
133+
134+
## Examples in context (single block)
135+
136+
{{< code-block lang="js" >}}
137+
// ==========================
138+
// producer-service.js
139+
// ==========================
140+
const tracer = require('dd-trace').init({}) // init in the producer service
141+
142+
async function publishOrder(order, channel) {
143+
// Use headers if supported; otherwise embed context in the payload.
144+
const headers = {}
145+
146+
// Mark DSM produce checkpoint right before sending the message.
147+
tracer.dataStreamsCheckpointer.setProduceCheckpoint(
148+
'rabbitmq', // queueType
149+
'orders', // name
150+
headers // carrier: where DSM context will be stored
151+
)
152+
153+
// Send the message with DSM context attached.
154+
const payload = JSON.stringify(order)
155+
publisher.publish('orders', Buffer.from(payload), { headers })
156+
}
67157

68-
// before calling produce
69-
const headers = {}
70-
tracer.dataStreamsCheckpointer.setProduceCheckpoint(
71-
"<datastream-type>", "<queue-name>", headers
72-
)
158+
// ==========================
159+
// consumer-service.js
160+
// ==========================
161+
const tracer = require('dd-trace').init({}) // init in the consumer service
73162

74-
// after calling consume
75-
tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
76-
"<datastream-type>", "<queue-name>", headers
77-
)
163+
function handleMessage(msg) {
164+
// Retrieve DSM context at the top of your handler.
165+
// If headers aren't supported, build a carrier that reads from your payload.
166+
const headers = msg.properties?.headers || {}
78167

168+
tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
169+
'rabbitmq', // queueType (match producer)
170+
'orders', // name (match producer)
171+
headers // carrier: where DSM context was stored
172+
)
173+
174+
// Continue with application logic.
175+
const body = JSON.parse(msg.content.toString())
176+
processOrder(body)
177+
}
79178
{{< /code-block >}}
80179
{{% /tab %}}
81180
{{% tab "Python" %}}
181+
## API reference
182+
183+
### `set_produce_checkpoint(queue_type, name, setter)`
184+
- **queue_type**: message system (for example `kafka`, `rabbitmq`, `sqs`, `sns`, `kinesis`, `servicebus`). Recognized strings surface system-specific DSM metrics; other strings are allowed.
185+
- **name**: queue, topic, or subscription name.
186+
- **setter**: a callable `(key, value)` used to **store** DSM context in the message.
187+
- If headers are supported: use `headers.setdefault`.
188+
- If not: use a function that writes into the message payload (like a JSON field).
189+
190+
### `set_consume_checkpoint(queue_type, name, getter)`
191+
- **queue_type**: same as producer.
192+
- **name**: same as producer.
193+
- **getter**: a callable `(key)` used to **retrieve** DSM context from the message.
194+
- If headers are supported: use `headers.get`.
195+
- If not: use a function that reads from the payload.
196+
197+
**Note**: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
198+
199+
---
200+
201+
## Examples in context (single block)
202+
82203
{{< code-block lang="python" >}}
83-
from ddtrace.data_streams import set_consume_checkpoint
204+
# ==========================
205+
# producer_service.py
206+
# ==========================
84207
from ddtrace.data_streams import set_produce_checkpoint
85208

86-
# before calling produce
87-
headers = {}
88-
set_produce_checkpoint(
89-
"<datastream-type>", "<datastream-name>", headers.setdefault
90-
)
209+
def publish_order(order, channel):
210+
headers = {}
211+
212+
# Mark DSM produce checkpoint before sending the message.
213+
set_produce_checkpoint(
214+
"rabbitmq", # queue_type
215+
"orders", # name
216+
headers.setdefault # setter: store DSM context in headers or payload
217+
)
218+
219+
# Send the message with DSM context attached.
220+
payload = order.to_json()
221+
publisher.publish(topic="orders", body=payload, properties=headers)
222+
223+
224+
# ==========================
225+
# consumer_service.py
226+
# ==========================
227+
from ddtrace.data_streams import set_consume_checkpoint
228+
229+
def handle_message(message, properties):
230+
headers = getattr(properties, "headers", {})
91231

92-
# after calling consume
93-
set_consume_checkpoint(
94-
"<datastream-type>", "<datastream-name>", headers.get
95-
)
232+
# Mark DSM consume checkpoint when receiving the message.
233+
set_consume_checkpoint(
234+
"rabbitmq", # queue_type (match producer)
235+
"orders", # name (match producer)
236+
headers.get # getter: retrieve DSM context from headers or payload
237+
)
96238

239+
# Continue with your application logic.
240+
process_order(message)
97241
{{< /code-block >}}
242+
{{% /tab %}}
243+
{{% tab "Ruby" %}}
244+
## API reference
245+
246+
### `Datadog::DataStreams.set_produce_checkpoint(queue_type, name, &block)`
247+
- **queue_type**: the message system (for example `rabbitmq`, `kafka`, `sqs`, `sns`, `kinesis`, `servicebus`). Using a recognized queue_type helps surface metrics related to that system in Data Streams, but other strings are allowed if needed.
248+
- **name**: the queue, topic, or subscription name.
249+
- **block**: yields `(key, pathway_context)`. Your block must *store* the DSM context with the message, under the given key
250+
- If headers are supported: put it in headers.
251+
- If not: embed it in the payload.
252+
253+
### `Datadog::DataStreams.set_consume_checkpoint(queue_type, name, &block)`
254+
- **queue_type**: same message system as the producer. Using a recognized queue_type helps surface metrics related to that system in Data Streams, but other strings are also allowed.
255+
- **name**: same queue, topic, or subscription name.
256+
- **block**: yields `(key)`. Your block must *retrieve* the DSM context from the message.
257+
- Whichever method (header or message body), the message was produced with
258+
259+
**Note**: This checkpoint does two things: it links the current message to the data stream, and it prepares this consumer to automatically pass the context to any messages it produces next.
260+
261+
---
262+
263+
## Examples in context
264+
265+
{{< code-block lang="ruby" >}}
266+
# Producer side
267+
268+
def publish_order(order)
269+
headers = {}
270+
271+
# Mark DSM produce checkpoint before sending the message.
272+
Datadog::DataStreams.set_produce_checkpoint("rabbitmq", "orders") do |key, pathway_context|
273+
# Store DSM context in the message
274+
# - If headers supported: headers[key] = pathway_context
275+
# - If no headers: message[key] = pathway_context
276+
headers[key] = pathway_context
277+
end
278+
279+
# Send the message with DSM context attached
280+
@publisher.publish(topic: "orders", payload: orders.to_json, headers: headers)
281+
end
282+
283+
# Consumer side
284+
285+
def handle_message(message)
286+
# Mark DSM consume checkpoint when receiving the message.
287+
Datadog::DataStreams..set_consume_checkpoint("rabbitmq", "orders") do |key|
288+
# Retrieve DSM context from the message
289+
# - If headers supported pull them from there
290+
# - If no headers: parsed_message[key]
291+
message.headers[key]
292+
end
293+
294+
# Continue with application logic
295+
process_order(message.body)
296+
end
297+
{{< /code-block >}}
298+
98299
{{% /tab %}}
99300
{{< /tabs >}}
100301
## Further Reading

0 commit comments

Comments
 (0)