Skip to content

Commit 4012984

Browse files
authored
feat(telemetry-server): implemented OTLP receiver (experimental) (#3732)
1 parent eeeecc1 commit 4012984

File tree

7 files changed

+444
-12
lines changed

7 files changed

+444
-12
lines changed

genkit-tools/telemetry-server/src/index.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { logger } from '@genkit-ai/tools-common/utils';
2222
import express from 'express';
2323
import type * as http from 'http';
2424
import type { TraceStore } from './types';
25+
import { traceDataFromOtlp } from './utils/otlp';
2526

2627
export { LocalFileTraceStore } from './file-trace-store.js';
2728
export { TraceQuerySchema, type TraceQuery, type TraceStore } from './types';
@@ -90,6 +91,29 @@ export async function startTelemetryServer(params: {
9091
}
9192
});
9293

94+
api.post('/api/otlp', async (request, response) => {
95+
try {
96+
if (!request.body.resourceSpans?.length) {
97+
// Acknowledge and ignore empty payloads.
98+
response.status(200).json({});
99+
return;
100+
}
101+
const traces = traceDataFromOtlp(request.body);
102+
for (const trace of traces) {
103+
const traceData = TraceDataSchema.parse(trace);
104+
await params.traceStore.save(traceData.traceId, traceData);
105+
}
106+
response.status(200).json({});
107+
} catch (err) {
108+
logger.error(`Error processing OTLP payload: ${err}`);
109+
response.status(500).json({
110+
code: 13, // INTERNAL
111+
message:
112+
'An internal error occurred while processing the OTLP payload.',
113+
});
114+
}
115+
});
116+
93117
api.use((err: any, req: any, res: any, next: any) => {
94118
logger.error(err.stack);
95119
const error = err as Error;
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/**
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { SpanData, TraceData } from '@genkit-ai/tools-common';
18+
19+
// These interfaces are based on the OTLP JSON format.
20+
// A full definition can be found at:
21+
// https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto
22+
23+
interface OtlpValue {
24+
stringValue?: string;
25+
intValue?: number;
26+
boolValue?: boolean;
27+
arrayValue?: {
28+
values: OtlpValue[];
29+
};
30+
}
31+
32+
interface OtlpAttribute {
33+
key: string;
34+
value: OtlpValue;
35+
}
36+
37+
interface OtlpSpan {
38+
traceId: string;
39+
spanId: string;
40+
parentSpanId?: string;
41+
name: string;
42+
kind: number;
43+
startTimeUnixNano: string;
44+
endTimeUnixNano: string;
45+
attributes: OtlpAttribute[];
46+
droppedAttributesCount: number;
47+
events: any[];
48+
droppedEventsCount: number;
49+
status?: {
50+
code: number;
51+
message?: string;
52+
};
53+
links: any[];
54+
droppedLinksCount: number;
55+
}
56+
57+
interface OtlpScopeSpan {
58+
scope: {
59+
name: string;
60+
version: string;
61+
};
62+
spans: OtlpSpan[];
63+
}
64+
65+
interface OtlpResourceSpan {
66+
resource: {
67+
attributes: OtlpAttribute[];
68+
droppedAttributesCount: number;
69+
};
70+
scopeSpans: OtlpScopeSpan[];
71+
}
72+
73+
interface OtlpPayload {
74+
resourceSpans: OtlpResourceSpan[];
75+
}
76+
77+
function toMillis(nano: string): number {
78+
return Math.round(parseInt(nano) / 1_000_000);
79+
}
80+
81+
function toSpanData(span: OtlpSpan, scope: OtlpScopeSpan['scope']): SpanData {
82+
const attributes: Record<string, any> = {};
83+
span.attributes.forEach((attr) => {
84+
if (attr.value.stringValue) {
85+
attributes[attr.key] = attr.value.stringValue;
86+
} else if (attr.value.intValue) {
87+
attributes[attr.key] = attr.value.intValue;
88+
} else if (attr.value.boolValue) {
89+
attributes[attr.key] = attr.value.boolValue;
90+
}
91+
});
92+
93+
let spanKind: string;
94+
switch (span.kind) {
95+
case 1:
96+
spanKind = 'INTERNAL';
97+
break;
98+
case 2:
99+
spanKind = 'SERVER';
100+
break;
101+
case 3:
102+
spanKind = 'CLIENT';
103+
break;
104+
case 4:
105+
spanKind = 'PRODUCER';
106+
break;
107+
case 5:
108+
spanKind = 'CONSUMER';
109+
break;
110+
default:
111+
spanKind = 'UNSPECIFIED';
112+
break;
113+
}
114+
115+
const spanData: SpanData = {
116+
traceId: span.traceId,
117+
spanId: span.spanId,
118+
parentSpanId: span.parentSpanId,
119+
startTime: toMillis(span.startTimeUnixNano),
120+
endTime: toMillis(span.endTimeUnixNano),
121+
displayName: span.name,
122+
attributes,
123+
instrumentationLibrary: {
124+
name: scope.name,
125+
version: scope.version,
126+
},
127+
spanKind,
128+
};
129+
if (span.status && span.status.code !== 0) {
130+
const status: { code: number; message?: string } = {
131+
code: span.status.code,
132+
};
133+
if (span.status.message) {
134+
status.message = span.status.message;
135+
}
136+
spanData.status = status;
137+
}
138+
return spanData;
139+
}
140+
141+
export function traceDataFromOtlp(otlpData: OtlpPayload): TraceData[] {
142+
const traces: Record<string, TraceData> = {};
143+
144+
otlpData.resourceSpans.forEach((resourceSpan) => {
145+
resourceSpan.scopeSpans.forEach((scopeSpan) => {
146+
scopeSpan.spans.forEach((span) => {
147+
if (!traces[span.traceId]) {
148+
traces[span.traceId] = {
149+
traceId: span.traceId,
150+
spans: {},
151+
};
152+
}
153+
traces[span.traceId].spans[span.spanId] = toSpanData(
154+
span,
155+
scopeSpan.scope
156+
);
157+
});
158+
});
159+
});
160+
161+
return Object.values(traces);
162+
}

genkit-tools/telemetry-server/tests/file_store_test.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@
1616

1717
import type { TraceData, TraceQueryFilter } from '@genkit-ai/tools-common';
1818
import * as assert from 'assert';
19+
import fs from 'fs';
1920
import getPort from 'get-port';
2021
import { afterEach, beforeEach, describe, it } from 'node:test';
2122
import os from 'os';
2223
import path from 'path';
24+
import { Index } from '../src/file-trace-store';
2325
import {
2426
LocalFileTraceStore,
2527
startTelemetryServer,
2628
stopTelemetryApi,
2729
} from '../src/index';
28-
import { Index } from '../src/localFileTraceStore';
2930
import { sleep, span } from './utils';
3031

3132
const TRACE_ID = '1234';
@@ -37,10 +38,10 @@ const SPAN_B = 'bcd';
3738
const SPAN_C = 'cde';
3839

3940
describe('local-file-store', () => {
40-
let port;
41-
let storeRoot;
42-
let indexRoot;
43-
let url;
41+
let port: number;
42+
let storeRoot: string;
43+
let indexRoot: string;
44+
let url: string;
4445

4546
beforeEach(async () => {
4647
port = await getPort();
@@ -276,18 +277,22 @@ describe('local-file-store', () => {
276277
});
277278

278279
describe('index', () => {
279-
let indexRoot;
280+
let indexRoot: string;
280281
let index: Index;
281282

282283
beforeEach(async () => {
283284
indexRoot = path.resolve(
284285
os.tmpdir(),
285-
`./telemetry-server-api-test-${Date.now()}/traces_idx`
286+
`./telemetry-server-api-test-${Date.now()}-${Math.floor(Math.random() * 1000)}/traces_idx`
286287
);
287288

288289
index = new Index(indexRoot);
289290
});
290291

292+
afterEach(() => {
293+
fs.rmSync(indexRoot, { recursive: true, force: true });
294+
});
295+
291296
it('should index and search spans', () => {
292297
const spanA = span(TRACE_ID_1, SPAN_A, 100, 100);
293298
spanA.displayName = 'spanA';

0 commit comments

Comments
 (0)