Skip to content
40 changes: 34 additions & 6 deletions packages/agents-a365-observability/src/ObservabilityBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
import { NodeSDK } from '@opentelemetry/sdk-node';
import { ConsoleSpanExporter, BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { SpanProcessor } from './tracing/processors/SpanProcessor';
import { isAgent365ExporterEnabled } from './tracing/exporter/utils';
import { isAgent365ExporterEnabled, isPerRequestExportEnabled } from './tracing/exporter/utils';
import { Agent365Exporter } from './tracing/exporter/Agent365Exporter';
import type { TokenResolver } from './tracing/exporter/Agent365ExporterOptions';
import { Agent365ExporterOptions } from './tracing/exporter/Agent365ExporterOptions';
import { PerRequestSpanProcessor, DEFAULT_FLUSH_GRACE_MS, DEFAULT_MAX_TRACE_AGE_MS } from './tracing/PerRequestSpanProcessor';
import { resourceFromAttributes } from '@opentelemetry/resources';
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
import { trace } from '@opentelemetry/api';
import { ClusterCategory } from '@microsoft/agents-a365-runtime';
import logger from './utils/logging';
/**
* Configuration options for Agent 365 Observability Builder
*/
Expand Down Expand Up @@ -92,6 +94,7 @@ export class ObservabilityBuilder {

private createBatchProcessor(): BatchSpanProcessor {
if (!isAgent365ExporterEnabled()) {
logger.info('[ObservabilityBuilder] Agent 365 exporter not enabled. Using ConsoleSpanExporter for BatchSpanProcessor.');
return new BatchSpanProcessor(new ConsoleSpanExporter());
}

Expand All @@ -111,6 +114,31 @@ export class ObservabilityBuilder {
});
}

private createPerRequestProcessor(): PerRequestSpanProcessor {
if (!isAgent365ExporterEnabled()) {
logger.info('[Agent365Exporter] Per-request export enabled but Agent 365 exporter is disabled. Using ConsoleSpanExporter.');
return new PerRequestSpanProcessor(new ConsoleSpanExporter());
}

const opts = new Agent365ExporterOptions();
if (this.options.exporterOptions) {
Object.assign(opts, this.options.exporterOptions);
}
opts.clusterCategory = this.options.clusterCategory || opts.clusterCategory || 'prod';

// For per-request export, token is retrieved from OTel Context by Agent365Exporter
// using getExportToken(), so no tokenResolver is needed here
return new PerRequestSpanProcessor(new Agent365Exporter(opts), DEFAULT_FLUSH_GRACE_MS, DEFAULT_MAX_TRACE_AGE_MS);
}

private createExportProcessor(): BatchSpanProcessor | PerRequestSpanProcessor {
if (isPerRequestExportEnabled()) {
return this.createPerRequestProcessor();
}

return this.createBatchProcessor();
}

private createResource() {
const serviceName = this.options.serviceVersion
? `${this.options.serviceName}-${this.options.serviceVersion}`
Expand All @@ -133,8 +161,8 @@ export class ObservabilityBuilder {
// 1. baggage enricher (copies baggage -> span attributes)
const spanProcessor = new SpanProcessor();

// 2. batch processor that actually ships spans out
const batchProcessor = this.createBatchProcessor();
// 2. export processor (batch or per-request based on environment variable)
const exportProcessor = this.createExportProcessor();

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const globalProvider: any = trace.getTracerProvider();
Expand All @@ -146,11 +174,11 @@ export class ObservabilityBuilder {
if (canAddProcessors) {
// Someone else already created a provider (maybe their own NodeSDK).
// We DO NOT create a new NodeSDK.
// We just add our baggage enricher + batch exporter to their provider,
// We just add our baggage enricher + export processor to their provider,
// but only if they aren't already there.

this.attachProcessorIfMissing(globalProvider, spanProcessor);
this.attachProcessorIfMissing(globalProvider, batchProcessor);
this.attachProcessorIfMissing(globalProvider, exportProcessor);

this.isBuilt = true;
this.sdk = undefined; // we didn't create/own one
Expand All @@ -163,7 +191,7 @@ export class ObservabilityBuilder {
resource: this.createResource(),
spanProcessors: [
spanProcessor,
batchProcessor,
exportProcessor,
],
});

Expand Down
3 changes: 3 additions & 0 deletions packages/agents-a365-observability/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export { OpenTelemetryConstants } from './tracing/constants';
// Baggage builder
export { BaggageBuilder, BaggageScope } from './tracing/middleware/BaggageBuilder';

// Per-request export utilities
export { runWithExportToken, getExportToken } from './tracing/context/token-context';

// Contracts and interfaces
export {
ExecutionType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
// ------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------------------------

import { context, type Context } from '@opentelemetry/api';
import type { ReadableSpan, SpanProcessor, SpanExporter } from '@opentelemetry/sdk-trace-base';
import logger from '../utils/logging';

/** Default grace period (ms) to wait for child spans after root span ends */
const DEFAULT_FLUSH_GRACE_MS = 250;

/** Default maximum age (ms) for a trace before forcing flush */
const DEFAULT_MAX_TRACE_AGE_MS = 30000;

function isRootSpan(span: ReadableSpan): boolean {
return !span.parentSpanContext;
}

type TraceBuffer = {
spans: ReadableSpan[];
openCount: number;
rootEnded: boolean;
rootCtx?: Context; // holds the request Context (with token in ALS)
startedAtMs: number;
rootEndedAtMs?: number;
};

type FlushReason = 'trace_completed' | 'root_ended_grace' | 'max_trace_age' | 'force_flush';

/**
* Buffers spans per trace and exports once the request completes.
* Token is not stored; we export under the saved request Context so that getExportToken()
* can read the token from the active OpenTelemetry Context at export time.
*/
export class PerRequestSpanProcessor implements SpanProcessor {
private traces = new Map<string, TraceBuffer>();
private sweepTimer?: NodeJS.Timeout;
private isSweeping = false;

constructor(
private readonly exporter: SpanExporter,
private readonly flushGraceMs: number = DEFAULT_FLUSH_GRACE_MS,
private readonly maxTraceAgeMs: number = DEFAULT_MAX_TRACE_AGE_MS
) {}

onStart(span: ReadableSpan, ctx: Context): void {
const traceId = span.spanContext().traceId;
let buf = this.traces.get(traceId);
if (!buf) {
buf = { spans: [], openCount: 0, rootEnded: false, rootCtx: undefined, startedAtMs: Date.now() };
this.traces.set(traceId, buf);

this.ensureSweepTimer();

logger.info(
`[PerRequestSpanProcessor] Trace started traceId=${traceId} maxTraceAgeMs=${this.maxTraceAgeMs}`
);
}
buf.openCount += 1;

// Debug lifecycle: span started
logger.info(
`[PerRequestSpanProcessor] Span start name=${span.name} traceId=${traceId} spanId=${span.spanContext().spanId}` +
` root=${isRootSpan(span)} openCount=${buf.openCount}`
);

// Capture a context to export under.
// - Use the first seen context as a fallback.
// - If/when the root span starts, prefer its context (contains token via ALS).
if (isRootSpan(span)) {
buf.rootCtx = ctx;
} else {
buf.rootCtx ??= ctx;
}
}

onEnd(span: ReadableSpan): void {
const traceId = span.spanContext().traceId;
const buf = this.traces.get(traceId);
if (!buf) return;

buf.spans.push(span);
buf.openCount -= 1;
if (buf.openCount < 0) {
logger.warn(
`[PerRequestSpanProcessor] openCount underflow traceId=${traceId} spanId=${span.spanContext().spanId} resettingToZero`
);
buf.openCount = 0;
}

// Debug lifecycle: span ended
logger.info(
`[PerRequestSpanProcessor] Span end name=${span.name} traceId=${traceId} spanId=${span.spanContext().spanId}` +
` root=${isRootSpan(span)} openCount=${buf.openCount} rootEnded=${buf.rootEnded}`
);

if (isRootSpan(span)) {
buf.rootEnded = true;
buf.rootEndedAtMs = Date.now();
if (buf.openCount === 0) {
// Trace completed: root ended and no open spans remain.
this.flushTrace(traceId, 'trace_completed');
}
} else if (buf.rootEnded && buf.openCount === 0) {
// Common case: root ends first, then children finish shortly after.
// Flush immediately when the last child ends instead of waiting for grace/max timers.
this.flushTrace(traceId, 'trace_completed');
}
}

async forceFlush(): Promise<void> {
await Promise.all([...this.traces.keys()].map((id) => this.flushTrace(id, 'force_flush')));
}

async shutdown(): Promise<void> {
await this.forceFlush();
this.stopSweepTimerIfIdle();
await this.exporter.shutdown?.();
}

private ensureSweepTimer(): void {
if (this.sweepTimer) return;

// Keep one lightweight sweeper. Interval is derived from grace/max-age to keep responsiveness reasonable.
const intervalMs = Math.max(10, Math.min(this.flushGraceMs, 250));
this.sweepTimer = setInterval(() => {
void this.sweep();
}, intervalMs);

this.sweepTimer.unref?.();
}

private stopSweepTimerIfIdle(): void {
if (this.traces.size !== 0) return;
if (!this.sweepTimer) return;
clearInterval(this.sweepTimer);
this.sweepTimer = undefined;
}

private async sweep(): Promise<void> {
if (this.isSweeping) return;
this.isSweeping = true;
try {
if (this.traces.size === 0) {
this.stopSweepTimerIfIdle();
return;
}

const now = Date.now();
const toFlush: Array<{ traceId: string; reason: FlushReason }> = [];

for (const [traceId, trace] of this.traces.entries()) {
// 1) Max age safety flush (clears buffers even if spans never end)
if (now - trace.startedAtMs >= this.maxTraceAgeMs) {
toFlush.push({ traceId, reason: 'max_trace_age' });
continue;
}

// 2) Root ended grace window flush (clears buffers if children never end)
if (trace.rootEnded && trace.openCount > 0 && trace.rootEndedAtMs) {
if (now - trace.rootEndedAtMs >= this.flushGraceMs) {
toFlush.push({ traceId, reason: 'root_ended_grace' });
}
}
}

// Flush in parallel; flushTrace removes entries eagerly.
await Promise.all(toFlush.map((x) => this.flushTrace(x.traceId, x.reason)));
this.stopSweepTimerIfIdle();
} finally {
this.isSweeping = false;
}
}

private async flushTrace(traceId: string, reason: FlushReason): Promise<void> {
const trace = this.traces.get(traceId);
if (!trace) return;

this.traces.delete(traceId);
this.stopSweepTimerIfIdle();

const spans = trace.spans;
if (spans.length === 0) return;

logger.info(
`[PerRequestSpanProcessor] Flushing trace traceId=${traceId} reason=${reason} spans=${spans.length} rootEnded=${trace.rootEnded}`
);

// Must have captured the root context to access the token
if (!trace.rootCtx) {
logger.error(`[PerRequestSpanProcessor] Missing rootCtx for trace ${traceId}, cannot export spans`);
return;
}

// Export under the original request Context so exporter can read the token from context.active()
await new Promise<void>((resolve) => {
try {
context.with(trace.rootCtx as Context, () => {
try {
this.exporter.export(spans, (result) => {
// Log export failures but still resolve to avoid blocking processor
if (result.code !== 0) {
logger.error(
`[PerRequestSpanProcessor] Export failed traceId=${traceId} reason=${reason} code=${result.code}`,
result.error
);
} else {
logger.info(
`[PerRequestSpanProcessor] Export succeeded traceId=${traceId} reason=${reason} spans=${spans.length}`
);
}
resolve();
});
} catch (err) {
logger.error(
`[PerRequestSpanProcessor] Export threw traceId=${traceId} reason=${reason} spans=${spans.length}`,
err
);
resolve();
}
});
} catch (err) {
logger.error(`[PerRequestSpanProcessor] context.with threw traceId=${traceId} reason=${reason}`, err);
resolve();
}
});
}
}

/** Export constants for use in configuration */
export { DEFAULT_FLUSH_GRACE_MS, DEFAULT_MAX_TRACE_AGE_MS };
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export class OpenTelemetryConstants {
public static readonly ENABLE_OBSERVABILITY = 'ENABLE_OBSERVABILITY';
public static readonly ENABLE_A365_OBSERVABILITY_EXPORTER = 'ENABLE_A365_OBSERVABILITY_EXPORTER';
public static readonly ENABLE_A365_OBSERVABILITY = 'ENABLE_A365_OBSERVABILITY';
public static readonly ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT = 'ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT';

// GenAI semantic conventions
public static readonly GEN_AI_CLIENT_OPERATION_DURATION_METRIC_NAME = 'gen_ai.client.operation.duration';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// ------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------------------------

import { context, createContextKey, Context } from '@opentelemetry/api';
import logger from '../../utils/logging';

const EXPORT_TOKEN_KEY = createContextKey('a365_export_token');

/**
* Run a function within a Context that carries the per-request export token.
* This keeps the token only in OTel Context (ALS), never in any registry.
*/
export function runWithExportToken<T>(token: string, fn: () => T): T {
const ctxWithToken = context.active().setValue(EXPORT_TOKEN_KEY, token);
logger.info('[TokenContext] Running function with export token in context.');
return context.with(ctxWithToken, fn);
}

/**
* Retrieve the per-request export token from a given OTel Context (or the active one).
*/
export function getExportToken(ctx: Context = context.active()): string | undefined {
return ctx.getValue(EXPORT_TOKEN_KEY) as string | undefined;
}
Loading