diff --git a/packages/agents-a365-observability/PER_REQUEST_EXPORT.md b/packages/agents-a365-observability/PER_REQUEST_EXPORT.md new file mode 100644 index 0000000..c00bb7a --- /dev/null +++ b/packages/agents-a365-observability/PER_REQUEST_EXPORT.md @@ -0,0 +1,73 @@ +# Per-request export (token via OpenTelemetry Context) + +This document explains how to enable **per-request export** for `@microsoft/agents-a365-observability` and how to provide an export token *per incoming request* using OpenTelemetry Context. + +## What it does + +When per-request export is enabled: + +- The SDK uses `PerRequestSpanProcessor` instead of OpenTelemetry’s `BatchSpanProcessor`. +- Spans are **buffered per trace** and exported when the request/trace completes. +- The Agent 365 exporter reads the auth token from the **active OpenTelemetry Context** at export time (set via `runWithExportToken`). + +This is useful when tokens are request-scoped (multi-tenant, delegated auth, per-user tokens, etc.). + +## Enable per-request export + +Set these environment variables: + +```bash +ENABLE_OBSERVABILITY=true +ENABLE_A365_OBSERVABILITY_EXPORTER=true +ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT=true +``` + +Then build/start observability as usual (the builder will pick the correct span processor automatically based on the env var). + +## Provide the token per request + +At your **request boundary** (e.g., Express route handler, bot adapter handler), acquire a token using your auth flow, then wrap the request handling code with `runWithExportToken(token, fn)`. + +Example (Express-style handler): + +```ts +import { runWithExportToken } from '@microsoft/agents-a365-observability'; + +app.post('/api/messages', async (req, res) => { + const token = await acquireTokenSomehow(req); // your auth + + await runWithExportToken(token, async () => { + // Any spans created in here (and async work chained from here) + // will export using this token. + await handleRequest(req, res); + }); +}); +``` + +Notes: + +- `runWithExportToken` uses OpenTelemetry Context (AsyncLocalStorage) so it must wrap the async call chain that creates the spans. +- In per-request export mode, the exporter will log an error if no token is available in context. + +## Guardrails (recommended) + +Per-request buffering can increase memory usage and cause export bursts during traffic spikes. `PerRequestSpanProcessor` includes guardrails that you can tune via env vars: + +- `A365_PER_REQUEST_MAX_CONCURRENT_EXPORTS` (default `20`): caps concurrent exports across requests/traces. +- `A365_PER_REQUEST_MAX_TRACES` (default `1000`): caps concurrently buffered traces. +- `A365_PER_REQUEST_MAX_SPANS_PER_TRACE` (default `5000`): caps buffered ended spans per trace. + +Set to `0` (or negative) to disable a specific guardrail. + +## Interaction with batch export + +- **Batch export mode** (`ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT` not enabled) requires `tokenResolver` so the exporter can resolve tokens during export. +- **Per-request export mode** uses the token from OpenTelemetry Context; a `tokenResolver` is not required. + +## Troubleshooting + +If you see: `No token available in OTel Context for per-request export` + +- Ensure `ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT=true`. +- Ensure your request handler is wrapped in `runWithExportToken(...)`. +- Ensure the work that creates spans runs within the same async call chain (avoid detaching work into separate processes/worker threads without propagating context). diff --git a/packages/agents-a365-observability/src/ObservabilityBuilder.ts b/packages/agents-a365-observability/src/ObservabilityBuilder.ts index 65db2b0..ec7cac9 100644 --- a/packages/agents-a365-observability/src/ObservabilityBuilder.ts +++ b/packages/agents-a365-observability/src/ObservabilityBuilder.ts @@ -5,14 +5,16 @@ import { NodeSDK } from '@opentelemetry/sdk-node'; import { ConsoleSpanExporter, BatchSpanProcessor } from '@opentelemetry/sdk-trace-base'; import { SpanProcessor } from './tracing/processors/SpanProcessor'; -import { isAgent365ExporterEnabled } from './tracing/exporter/utils'; +import { isAgent365ExporterEnabled, isPerRequestExportEnabled } from './tracing/exporter/utils'; import { Agent365Exporter } from './tracing/exporter/Agent365Exporter'; import type { TokenResolver } from './tracing/exporter/Agent365ExporterOptions'; import { Agent365ExporterOptions } from './tracing/exporter/Agent365ExporterOptions'; +import { PerRequestSpanProcessor } from './tracing/PerRequestSpanProcessor'; import { resourceFromAttributes } from '@opentelemetry/resources'; import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions'; import { trace } from '@opentelemetry/api'; import { ClusterCategory } from '@microsoft/agents-a365-runtime'; +import logger from './utils/logging'; /** * Configuration options for Agent 365 Observability Builder */ @@ -92,6 +94,7 @@ export class ObservabilityBuilder { private createBatchProcessor(): BatchSpanProcessor { if (!isAgent365ExporterEnabled()) { + logger.info('[ObservabilityBuilder] Agent 365 exporter not enabled. Using ConsoleSpanExporter for BatchSpanProcessor.'); return new BatchSpanProcessor(new ConsoleSpanExporter()); } @@ -111,6 +114,31 @@ export class ObservabilityBuilder { }); } + private createPerRequestProcessor(): PerRequestSpanProcessor { + if (!isAgent365ExporterEnabled()) { + logger.info('[Agent365Exporter] Per-request export enabled but Agent 365 exporter is disabled. Using ConsoleSpanExporter.'); + return new PerRequestSpanProcessor(new ConsoleSpanExporter()); + } + + const opts = new Agent365ExporterOptions(); + if (this.options.exporterOptions) { + Object.assign(opts, this.options.exporterOptions); + } + opts.clusterCategory = this.options.clusterCategory || opts.clusterCategory || 'prod'; + + // For per-request export, token is retrieved from OTel Context by Agent365Exporter + // using getExportToken(), so no tokenResolver is needed here + return new PerRequestSpanProcessor(new Agent365Exporter(opts)); + } + + private createExportProcessor(): BatchSpanProcessor | PerRequestSpanProcessor { + if (isPerRequestExportEnabled()) { + return this.createPerRequestProcessor(); + } + + return this.createBatchProcessor(); + } + private createResource() { const serviceName = this.options.serviceVersion ? `${this.options.serviceName}-${this.options.serviceVersion}` @@ -133,8 +161,8 @@ export class ObservabilityBuilder { // 1. baggage enricher (copies baggage -> span attributes) const spanProcessor = new SpanProcessor(); - // 2. batch processor that actually ships spans out - const batchProcessor = this.createBatchProcessor(); + // 2. export processor (batch or per-request based on environment variable) + const exportProcessor = this.createExportProcessor(); // eslint-disable-next-line @typescript-eslint/no-explicit-any const globalProvider: any = trace.getTracerProvider(); @@ -146,11 +174,11 @@ export class ObservabilityBuilder { if (canAddProcessors) { // Someone else already created a provider (maybe their own NodeSDK). // We DO NOT create a new NodeSDK. - // We just add our baggage enricher + batch exporter to their provider, + // We just add our baggage enricher + export processor to their provider, // but only if they aren't already there. this.attachProcessorIfMissing(globalProvider, spanProcessor); - this.attachProcessorIfMissing(globalProvider, batchProcessor); + this.attachProcessorIfMissing(globalProvider, exportProcessor); this.isBuilt = true; this.sdk = undefined; // we didn't create/own one @@ -163,7 +191,7 @@ export class ObservabilityBuilder { resource: this.createResource(), spanProcessors: [ spanProcessor, - batchProcessor, + exportProcessor, ], }); diff --git a/packages/agents-a365-observability/src/index.ts b/packages/agents-a365-observability/src/index.ts index 85b7d97..a08b600 100644 --- a/packages/agents-a365-observability/src/index.ts +++ b/packages/agents-a365-observability/src/index.ts @@ -12,6 +12,9 @@ export { OpenTelemetryConstants } from './tracing/constants'; // Baggage builder export { BaggageBuilder, BaggageScope } from './tracing/middleware/BaggageBuilder'; +// Per-request export utilities +export { runWithExportToken, getExportToken } from './tracing/context/token-context'; + // Contracts and interfaces export { ExecutionType, diff --git a/packages/agents-a365-observability/src/tracing/PerRequestSpanProcessor.ts b/packages/agents-a365-observability/src/tracing/PerRequestSpanProcessor.ts new file mode 100644 index 0000000..d3f2772 --- /dev/null +++ b/packages/agents-a365-observability/src/tracing/PerRequestSpanProcessor.ts @@ -0,0 +1,307 @@ +// ------------------------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +// ------------------------------------------------------------------------------ + +import { context, type Context } from '@opentelemetry/api'; +import type { ReadableSpan, SpanProcessor, SpanExporter } from '@opentelemetry/sdk-trace-base'; +import logger from '../utils/logging'; + +/** Default grace period (ms) to wait for child spans after root span ends */ +const DEFAULT_FLUSH_GRACE_MS = 250; + +/** Default maximum age (ms) for a trace before forcing flush */ +const DEFAULT_MAX_TRACE_AGE_MS = 30000; + +/** Guardrails to prevent unbounded memory growth / export bursts */ +const DEFAULT_MAX_BUFFERED_TRACES = 1000; +const DEFAULT_MAX_SPANS_PER_TRACE = 5000; +const DEFAULT_MAX_CONCURRENT_EXPORTS = 20; + +function readEnvInt(name: string, fallback: number): number { + const raw = process.env[name]; + if (!raw) return fallback; + const parsed = Number.parseInt(raw, 10); + return Number.isFinite(parsed) ? parsed : fallback; +} + +function isRootSpan(span: ReadableSpan): boolean { + return !span.parentSpanContext; +} + +type TraceBuffer = { + spans: ReadableSpan[]; + openCount: number; + rootEnded: boolean; + rootCtx?: Context; // holds the request Context (with token in ALS) + startedAtMs: number; + rootEndedAtMs?: number; + droppedSpans: number; +}; + +type FlushReason = 'trace_completed' | 'root_ended_grace' | 'max_trace_age' | 'force_flush'; + +/** + * Buffers spans per trace and exports once the request completes. + * Token is not stored; we export under the saved request Context so that getExportToken() + * can read the token from the active OpenTelemetry Context at export time. + */ +export class PerRequestSpanProcessor implements SpanProcessor { + private traces = new Map(); + private sweepTimer?: NodeJS.Timeout; + private isSweeping = false; + + private readonly maxBufferedTraces: number; + private readonly maxSpansPerTrace: number; + private readonly maxConcurrentExports: number; + + private inFlightExports = 0; + private exportWaiters: Array<() => void> = []; + + constructor( + private readonly exporter: SpanExporter, + private readonly flushGraceMs: number = DEFAULT_FLUSH_GRACE_MS, + private readonly maxTraceAgeMs: number = DEFAULT_MAX_TRACE_AGE_MS + ) { + // Defaults are intentionally high but bounded; override via env vars if needed. + // Set to 0 (or negative) to disable a guardrail. + this.maxBufferedTraces = readEnvInt('A365_PER_REQUEST_MAX_TRACES', DEFAULT_MAX_BUFFERED_TRACES); + this.maxSpansPerTrace = readEnvInt('A365_PER_REQUEST_MAX_SPANS_PER_TRACE', DEFAULT_MAX_SPANS_PER_TRACE); + this.maxConcurrentExports = readEnvInt('A365_PER_REQUEST_MAX_CONCURRENT_EXPORTS', DEFAULT_MAX_CONCURRENT_EXPORTS); + } + + onStart(span: ReadableSpan, ctx: Context): void { + const traceId = span.spanContext().traceId; + let buf = this.traces.get(traceId); + if (!buf) { + if (this.traces.size >= this.maxBufferedTraces) { + logger.warn( + `[PerRequestSpanProcessor] Dropping new trace due to maxBufferedTraces=${this.maxBufferedTraces} traceId=${traceId}` + ); + return; + } + + buf = { + spans: [], + openCount: 0, + rootEnded: false, + rootCtx: undefined, + startedAtMs: Date.now(), + droppedSpans: 0, + }; + this.traces.set(traceId, buf); + + this.ensureSweepTimer(); + + logger.info( + `[PerRequestSpanProcessor] Trace started traceId=${traceId} maxTraceAgeMs=${this.maxTraceAgeMs}` + ); + } + buf.openCount += 1; + + // Debug lifecycle: span started + logger.info( + `[PerRequestSpanProcessor] Span start name=${span.name} traceId=${traceId} spanId=${span.spanContext().spanId}` + + ` root=${isRootSpan(span)} openCount=${buf.openCount}` + ); + + // Capture a context to export under. + // - Use the first seen context as a fallback. + // - If/when the root span starts, prefer its context (contains token via ALS). + if (isRootSpan(span)) { + buf.rootCtx = ctx; + } else { + buf.rootCtx ??= ctx; + } + } + + onEnd(span: ReadableSpan): void { + const traceId = span.spanContext().traceId; + const buf = this.traces.get(traceId); + if (!buf) return; + + if (buf.spans.length >= this.maxSpansPerTrace) { + buf.droppedSpans += 1; + if (buf.droppedSpans === 1 || buf.droppedSpans % 100 === 0) { + logger.warn( + `[PerRequestSpanProcessor] Dropping ended span due to maxSpansPerTrace=${this.maxSpansPerTrace} ` + + `traceId=${traceId} droppedSpans=${buf.droppedSpans}` + ); + } + } else { + buf.spans.push(span); + } + buf.openCount -= 1; + if (buf.openCount < 0) { + logger.warn( + `[PerRequestSpanProcessor] openCount underflow traceId=${traceId} spanId=${span.spanContext().spanId} resettingToZero` + ); + buf.openCount = 0; + } + + // Debug lifecycle: span ended + logger.info( + `[PerRequestSpanProcessor] Span end name=${span.name} traceId=${traceId} spanId=${span.spanContext().spanId}` + + ` root=${isRootSpan(span)} openCount=${buf.openCount} rootEnded=${buf.rootEnded}` + ); + + if (isRootSpan(span)) { + buf.rootEnded = true; + buf.rootEndedAtMs = Date.now(); + if (buf.openCount === 0) { + // Trace completed: root ended and no open spans remain. + this.flushTrace(traceId, 'trace_completed'); + } + } else if (buf.rootEnded && buf.openCount === 0) { + // Common case: root ends first, then children finish shortly after. + // Flush immediately when the last child ends instead of waiting for grace/max timers. + this.flushTrace(traceId, 'trace_completed'); + } + } + + async forceFlush(): Promise { + await Promise.all([...this.traces.keys()].map((id) => this.flushTrace(id, 'force_flush'))); + } + + async shutdown(): Promise { + await this.forceFlush(); + this.stopSweepTimerIfIdle(); + await this.exporter.shutdown?.(); + } + + private ensureSweepTimer(): void { + if (this.sweepTimer) return; + + // Keep one lightweight sweeper. Interval is derived from grace/max-age to keep responsiveness reasonable. + const intervalMs = Math.max(10, Math.min(this.flushGraceMs, 250)); + this.sweepTimer = setInterval(() => { + void this.sweep(); + }, intervalMs); + + this.sweepTimer.unref?.(); + } + + private stopSweepTimerIfIdle(): void { + if (this.traces.size !== 0) return; + if (!this.sweepTimer) return; + clearInterval(this.sweepTimer); + this.sweepTimer = undefined; + } + + private async sweep(): Promise { + if (this.isSweeping) return; + this.isSweeping = true; + try { + if (this.traces.size === 0) { + this.stopSweepTimerIfIdle(); + return; + } + + const now = Date.now(); + const toFlush: Array<{ traceId: string; reason: FlushReason }> = []; + + for (const [traceId, trace] of this.traces.entries()) { + // 1) Max age safety flush (clears buffers even if spans never end) + if (now - trace.startedAtMs >= this.maxTraceAgeMs) { + toFlush.push({ traceId, reason: 'max_trace_age' }); + continue; + } + + // 2) Root ended grace window flush (clears buffers if children never end) + if (trace.rootEnded && trace.openCount > 0 && trace.rootEndedAtMs) { + if (now - trace.rootEndedAtMs >= this.flushGraceMs) { + toFlush.push({ traceId, reason: 'root_ended_grace' }); + } + } + } + + // Flush in parallel; flushTrace removes entries eagerly. + await Promise.all(toFlush.map((x) => this.flushTrace(x.traceId, x.reason))); + this.stopSweepTimerIfIdle(); + } finally { + this.isSweeping = false; + } + } + + private async flushTrace(traceId: string, reason: FlushReason): Promise { + const trace = this.traces.get(traceId); + if (!trace) return; + + this.traces.delete(traceId); + this.stopSweepTimerIfIdle(); + + const spans = trace.spans; + if (spans.length === 0) return; + + logger.info( + `[PerRequestSpanProcessor] Flushing trace traceId=${traceId} reason=${reason} spans=${spans.length} rootEnded=${trace.rootEnded}` + ); + + // Must have captured the root context to access the token + if (!trace.rootCtx) { + logger.error(`[PerRequestSpanProcessor] Missing rootCtx for trace ${traceId}, cannot export spans`); + return; + } + + await this.acquireExportSlot(); + + try { + // Export under the original request Context so exporter can read the token from context.active() + await new Promise((resolve) => { + try { + context.with(trace.rootCtx as Context, () => { + try { + this.exporter.export(spans, (result) => { + // Log export failures but still resolve to avoid blocking processor + if (result.code !== 0) { + logger.error( + `[PerRequestSpanProcessor] Export failed traceId=${traceId} reason=${reason} code=${result.code}`, + result.error + ); + } else { + logger.info( + `[PerRequestSpanProcessor] Export succeeded traceId=${traceId} reason=${reason} spans=${spans.length}` + ); + } + resolve(); + }); + } catch (err) { + logger.error( + `[PerRequestSpanProcessor] Export threw traceId=${traceId} reason=${reason} spans=${spans.length}`, + err + ); + resolve(); + } + }); + } catch (err) { + logger.error(`[PerRequestSpanProcessor] context.with threw traceId=${traceId} reason=${reason}`, err); + resolve(); + } + }); + } finally { + this.releaseExportSlot(); + } + } + + private async acquireExportSlot(): Promise { + if (this.maxConcurrentExports <= 0) return; + if (this.inFlightExports < this.maxConcurrentExports) { + this.inFlightExports += 1; + return; + } + + await new Promise((resolve) => { + this.exportWaiters.push(() => { + this.inFlightExports += 1; + resolve(); + }); + }); + } + + private releaseExportSlot(): void { + if (this.maxConcurrentExports <= 0) return; + this.inFlightExports = Math.max(0, this.inFlightExports - 1); + const next = this.exportWaiters.shift(); + if (next) next(); + } +} diff --git a/packages/agents-a365-observability/src/tracing/constants.ts b/packages/agents-a365-observability/src/tracing/constants.ts index bc7426a..6a4dd95 100644 --- a/packages/agents-a365-observability/src/tracing/constants.ts +++ b/packages/agents-a365-observability/src/tracing/constants.ts @@ -24,6 +24,7 @@ export class OpenTelemetryConstants { public static readonly ENABLE_OBSERVABILITY = 'ENABLE_OBSERVABILITY'; public static readonly ENABLE_A365_OBSERVABILITY_EXPORTER = 'ENABLE_A365_OBSERVABILITY_EXPORTER'; public static readonly ENABLE_A365_OBSERVABILITY = 'ENABLE_A365_OBSERVABILITY'; + public static readonly ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT = 'ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT'; // GenAI semantic conventions public static readonly GEN_AI_CLIENT_OPERATION_DURATION_METRIC_NAME = 'gen_ai.client.operation.duration'; diff --git a/packages/agents-a365-observability/src/tracing/context/token-context.ts b/packages/agents-a365-observability/src/tracing/context/token-context.ts new file mode 100644 index 0000000..3fc034a --- /dev/null +++ b/packages/agents-a365-observability/src/tracing/context/token-context.ts @@ -0,0 +1,25 @@ +// ------------------------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------------------------ + +import { context, createContextKey, Context } from '@opentelemetry/api'; +import logger from '../../utils/logging'; + +const EXPORT_TOKEN_KEY = createContextKey('a365_export_token'); + +/** + * Run a function within a Context that carries the per-request export token. + * This keeps the token only in OTel Context (ALS), never in any registry. + */ +export function runWithExportToken(token: string, fn: () => T): T { + const ctxWithToken = context.active().setValue(EXPORT_TOKEN_KEY, token); + logger.info('[TokenContext] Running function with export token in context.'); + return context.with(ctxWithToken, fn); +} + +/** + * Retrieve the per-request export token from a given OTel Context (or the active one). + */ +export function getExportToken(ctx: Context = context.active()): string | undefined { + return ctx.getValue(EXPORT_TOKEN_KEY) as string | undefined; +} diff --git a/packages/agents-a365-observability/src/tracing/exporter/Agent365Exporter.ts b/packages/agents-a365-observability/src/tracing/exporter/Agent365Exporter.ts index dd49cab..6c13b13 100644 --- a/packages/agents-a365-observability/src/tracing/exporter/Agent365Exporter.ts +++ b/packages/agents-a365-observability/src/tracing/exporter/Agent365Exporter.ts @@ -16,8 +16,10 @@ import { statusName, useCustomDomainForObservability, resolveAgent365Endpoint, - getAgent365ObservabilityDomainOverride + getAgent365ObservabilityDomainOverride, + isPerRequestExportEnabled } from './utils'; +import { getExportToken } from '../context/token-context'; import logger, { formatError } from '../../utils/logging'; import { Agent365ExporterOptions } from './Agent365ExporterOptions'; @@ -95,8 +97,8 @@ export class Agent365Exporter implements SpanExporter { throw new Error('Agent365ExporterOptions must be provided (was null/undefined)'); } - if (!options.tokenResolver) { - throw new Error('Agent365Exporter tokenResolver must be provided'); + if (!isPerRequestExportEnabled() && !options.tokenResolver) { + throw new Error('Agent365Exporter tokenResolver must be provided for batch export'); } this.options = options; } @@ -180,17 +182,31 @@ export class Agent365Exporter implements SpanExporter { 'content-type': 'application/json' }; - if (!this.options.tokenResolver) { - logger.error('[Agent365Exporter] tokenResolver is undefined, skip exporting'); - return; + let token: string | null = null; + + if (isPerRequestExportEnabled()) { + // For per-request export, get token from OTel Context + token = getExportToken() ?? null; + if (!token) { + logger.error('[Agent365Exporter] No token available in OTel Context for per-request export'); + } + } else { + // For batch export, use tokenResolver + if (!this.options.tokenResolver) { + logger.error('[Agent365Exporter] tokenResolver is undefined, skip exporting'); + return; + } + const tokenResult = this.options.tokenResolver(agentId, tenantId); + token = tokenResult instanceof Promise ? await tokenResult : tokenResult; + if (token) { + logger.info('[Agent365Exporter] Token resolved successfully via tokenResolver'); + } else { + logger.error('[Agent365Exporter] No token resolved via tokenResolver'); + } } - const tokenResult = this.options.tokenResolver(agentId, tenantId); - const token = tokenResult instanceof Promise ? await tokenResult : tokenResult; + if (token) { headers['authorization'] = `Bearer ${token}`; - logger.info('[Agent365Exporter] Token resolved successfully'); - } else { - logger.error('[Agent365Exporter] No token resolved'); } // Add tenant id to headers when using custom domain diff --git a/packages/agents-a365-observability/src/tracing/exporter/utils.ts b/packages/agents-a365-observability/src/tracing/exporter/utils.ts index c341d73..b75e5f2 100644 --- a/packages/agents-a365-observability/src/tracing/exporter/utils.ts +++ b/packages/agents-a365-observability/src/tracing/exporter/utils.ts @@ -120,6 +120,19 @@ export function isAgent365ExporterEnabled(): boolean { return enabled; } +/** + * Check if per-request export is enabled via environment variable. + * When enabled, the PerRequestSpanProcessor is used instead of BatchSpanProcessor. + * The token is passed via OTel Context (async local storage) at export time. + */ +export function isPerRequestExportEnabled(): boolean { + const value = process.env[OpenTelemetryConstants.ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT]?.toLowerCase() || ''; + const validValues = ['true', '1', 'yes', 'on']; + const enabled: boolean = validValues.includes(value); + logger.info(`[Agent365Exporter] Per-request export enabled: ${enabled}`); + return enabled; +} + /** * Single toggle to use custom domain for observability export. * When true exporter will send traces to custom Agent365 service endpoint diff --git a/tests-agent/basic-agent-sdk-sample/.env.example b/tests-agent/basic-agent-sdk-sample/.env.example index ca45fbd..d47f6f5 100644 --- a/tests-agent/basic-agent-sdk-sample/.env.example +++ b/tests-agent/basic-agent-sdk-sample/.env.example @@ -12,6 +12,17 @@ agentic_scopes=https://graph.microsoft.com/.default # Agent 365 observability Environment Configuration ENABLE_OBSERVABILITY=true ENABLE_A365_OBSERVABILITY_EXPORTER=true + +# Per-request export (token from OpenTelemetry Context) +# When true, the sample will acquire an export token per incoming request and set it in OTel Context. +ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT= # optional - set to 'true' to enable per-request export, defaults to 'false' if not set + +# Per-request span processor guardrails (apply when per-request export is enabled) +# Set to 0 (or negative) to disable a guardrail. +A365_PER_REQUEST_MAX_CONCURRENT_EXPORTS= # optional - max concurrent exports, default is 20 +A365_PER_REQUEST_MAX_TRACES= # optional - max concurrently buffered traces, default is 1000 +A365_PER_REQUEST_MAX_SPANS_PER_TRACE= # optional - max buffered ended spans per trace, default is 5000 + CLUSTER_CATEGORY=prod # optional - defaults to 'prod' if not set A365_OBSERVABILITY_LOG_LEVEL= # optional - set to enable observability logs, value can be 'info', 'warn', or 'error', default to 'none' if not set Use_Custom_Resolver= # optional - set to 'true' to use custom token resolver, defaults to 'false' if not set diff --git a/tests-agent/basic-agent-sdk-sample/src/agent.ts b/tests-agent/basic-agent-sdk-sample/src/agent.ts index 82ff575..a9ef584 100644 --- a/tests-agent/basic-agent-sdk-sample/src/agent.ts +++ b/tests-agent/basic-agent-sdk-sample/src/agent.ts @@ -24,7 +24,7 @@ import { } from '@microsoft/agents-a365-observability'; import { getObservabilityAuthenticationScope } from '@microsoft/agents-a365-runtime'; import { AgenticTokenCacheInstance, BaggageBuilderUtils, ScopeUtils } from '@microsoft/agents-a365-observability-hosting'; -import tokenCache from './token-cache'; +import tokenCache from './token-cache'; interface ConversationState { count: number; } @@ -81,22 +81,24 @@ agentApplication.onActivity( // Set Use_Custom_Resolver === 'true' to use a custom token resolver (see telemetry.ts) and a custom token cache (see token-cache.ts). // Otherwise: use the default AgenticTokenCache via RefreshObservabilityToken. - if (process.env.Use_Custom_Resolver === 'true') { - const aauToken = await agentApplication.authorization.exchangeToken(context, 'agentic', { - scopes: getObservabilityAuthenticationScope() - }); - const cacheKey = createAgenticTokenCacheKey(agentInfo.agentId, tenantInfo.tenantId); - tokenCache.set(cacheKey, aauToken?.token || ''); - } else { - // Preload/refresh the observability token into the shared AgenticTokenCache. - // We don't immediately need the token here, and if acquisition fails we continue (non-fatal for this demo sample). - await AgenticTokenCacheInstance.RefreshObservabilityToken( - agentInfo.agentId, - tenantInfo.tenantId, - context, - agentApplication.authorization, - getObservabilityAuthenticationScope() - ); + if (process.env.ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT?.toLowerCase() !== 'true') { + if (process.env.Use_Custom_Resolver === 'true') { + const aauToken = await agentApplication.authorization.exchangeToken(context, 'agentic', { + scopes: getObservabilityAuthenticationScope() + }); + const cacheKey = createAgenticTokenCacheKey(agentInfo.agentId, tenantInfo.tenantId); + tokenCache.set(cacheKey, aauToken?.token || ''); + } else { + // Preload/refresh the observability token into the shared AgenticTokenCache. + // We don't immediately need the token here, and if acquisition fails we continue (non-fatal for this demo sample). + await AgenticTokenCacheInstance.RefreshObservabilityToken( + agentInfo.agentId, + tenantInfo.tenantId, + context, + agentApplication.authorization, + getObservabilityAuthenticationScope() + ); + } } const llmResponse = await performInference( diff --git a/tests-agent/basic-agent-sdk-sample/src/index.ts b/tests-agent/basic-agent-sdk-sample/src/index.ts index fb8ec93..9940471 100644 --- a/tests-agent/basic-agent-sdk-sample/src/index.ts +++ b/tests-agent/basic-agent-sdk-sample/src/index.ts @@ -1,12 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + // It is important to load environment variables before importing other modules import { configDotenv } from 'dotenv'; configDotenv(); -import { AuthConfiguration, authorizeJWT, CloudAdapter, loadAuthConfigFromEnv, Request } from '@microsoft/agents-hosting'; +import { AuthConfiguration, CloudAdapter, loadAuthConfigFromEnv, Request } from '@microsoft/agents-hosting'; import express, { NextFunction, Response } from 'express'; import { agentApplication } from './agent'; import { a365Observability } from './telemetry'; +import { logger, runWithExportToken } from '@microsoft/agents-a365-observability'; +import { getObservabilityAuthenticationScope } from '@microsoft/agents-a365-runtime'; const authConfig: AuthConfiguration = loadAuthConfigFromEnv(); const adapter = new CloudAdapter(authConfig); @@ -18,7 +23,7 @@ a365Observability.start(); // Mock authentication middleware for development // This is only required when running from agents playground -app.use((req: Request, res: Response, next: NextFunction) => { +app.use((req: Request, _res: Response, next: NextFunction) => { // Create a mock identity when JWT is disabled req.user = { aud: authConfig.clientId || 'mock-client-id', @@ -30,24 +35,53 @@ app.use((req: Request, res: Response, next: NextFunction) => { app.post('/api/messages', async (req: Request, res: Response) => { try { + // Check if per-request export is enabled + const isPerRequestExportEnabled = + process.env.ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT?.toLowerCase() === 'true'; + await adapter.process(req, res, async (context) => { - const app = agentApplication; - await app.run(context); + const agentApp = agentApplication; + + if (!isPerRequestExportEnabled) { + // For batch export, token resolution is handled by exporter/tokenResolver. + await agentApp.run(context); + return; + } + + let token = ''; + try { + const exchanged = await agentApp.authorization.exchangeToken(context, 'agentic', { + scopes: getObservabilityAuthenticationScope() + }); + token = exchanged?.token || ''; + } catch (exchangeErr) { + logger.error('[diagnostic] token exchange failed; continuing without export token', exchangeErr); + token = ''; + } + + await runWithExportToken(token, async () => { + await agentApp.run(context); + }); }); } catch (err) { // Enhanced diagnostic logging for token acquisition / adapter failures - const anyErr = err as any; - const status = anyErr?.status || anyErr?.response?.status; - const data = anyErr?.response?.data; - const message = anyErr?.message || 'Unknown error'; - // Axios style nested config + type AdapterProcessError = Error & { + status?: number; + response?: { status?: number; data?: unknown }; + config?: { url?: string; data?: unknown }; + }; + + const e = err as AdapterProcessError; + const status = e?.status || e?.response?.status; + const data = e?.response?.data as Record | undefined; + const message = e?.message || 'Unknown error'; const aadError = data?.error || data?.error_description || data; console.error('[diagnostic] adapter.process failed', { message, status, aadError, - url: anyErr?.config?.url, - scope: anyErr?.config?.data, + url: e?.config?.url, + scope: e?.config?.data, }); // Surface minimal info to caller while keeping internals in log if (!res.headersSent) { diff --git a/tests/observability/core/PerRequestSpanProcessor.test.ts b/tests/observability/core/PerRequestSpanProcessor.test.ts new file mode 100644 index 0000000..5048fd3 --- /dev/null +++ b/tests/observability/core/PerRequestSpanProcessor.test.ts @@ -0,0 +1,489 @@ +// ------------------------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------------------------ + +import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base'; +import { PerRequestSpanProcessor } from '@microsoft/agents-a365-observability/src/tracing/PerRequestSpanProcessor'; +import { runWithExportToken } from '@microsoft/agents-a365-observability/src/tracing/context/token-context'; +import type { SpanExporter, ReadableSpan } from '@opentelemetry/sdk-trace-base'; +import { ExportResult, ExportResultCode } from '@opentelemetry/core'; +import { context, trace } from '@opentelemetry/api'; + +describe('PerRequestSpanProcessor', () => { + let provider: BasicTracerProvider; + let processor: PerRequestSpanProcessor; + let exportedSpans: ReadableSpan[][] = []; + let mockExporter: SpanExporter; + let originalEnv: NodeJS.ProcessEnv; + + const getActiveTraceCount = () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const traces: Map | undefined = (processor as any).traces; + return traces?.size ?? 0; + }; + + beforeEach(() => { + originalEnv = { ...process.env }; + exportedSpans = []; + mockExporter = { + export: (spans: ReadableSpan[], resultCallback: (result: ExportResult) => void) => { + exportedSpans.push([...spans]); + resultCallback({ code: ExportResultCode.SUCCESS }); + }, + shutdown: async () => { + // No-op: provider shutdown is handled explicitly in afterEach. + } + }; + + processor = new PerRequestSpanProcessor(mockExporter); + provider = new BasicTracerProvider({ + spanProcessors: [processor] + }); + }); + + afterEach(async () => { + await provider.shutdown(); + jest.useRealTimers(); + process.env = originalEnv; + }); + + const recreateProvider = async (newProcessor: PerRequestSpanProcessor) => { + // Important: ensure old provider is fully shut down before replacing it. + // Otherwise, providers/processors/timers can accumulate across a Jest run. + await provider.shutdown(); + processor = newProcessor; + provider = new BasicTracerProvider({ + spanProcessors: [processor] + }); + }; + + describe('per-request export with token context', () => { + it('should cap the number of buffered traces (maxBufferedTraces)', async () => { + process.env.A365_PER_REQUEST_MAX_TRACES = '2'; + await recreateProvider(new PerRequestSpanProcessor(mockExporter)); + + const tracer = provider.getTracer('test'); + + // The guardrail caps *concurrently buffered* traces. To make that observable, + // keep trace-1 buffered (root ended but child still open) while starting trace-2. + await new Promise((resolve) => { + runWithExportToken('token-1', () => { + const root1 = tracer.startSpan('trace-1', { root: true }); + const ctx1 = trace.setSpan(context.active(), root1); + const child1 = tracer.startSpan('trace-1-child', undefined, ctx1); + + // End root but leave child open so the trace remains buffered. + root1.end(); + + runWithExportToken('token-2', () => { + const root2 = tracer.startSpan('trace-2'); + root2.end(); + }); + + // Finish trace-1 so it can flush. + setTimeout(() => { + child1.end(); + setTimeout(resolve, 50); + }, 10); + }); + }); + + // With max traces=2, both traces are allowed (trace-2 should NOT be dropped). + const exportedNames = exportedSpans.flatMap((s) => s.map((sp) => sp.name)); + expect(exportedNames).toContain('trace-1'); + expect(exportedNames).toContain('trace-1-child'); + expect(exportedNames).toContain('trace-2'); + }); + + it('should drop additional traces beyond maxBufferedTraces (drop case)', async () => { + process.env.A365_PER_REQUEST_MAX_TRACES = '2'; + await recreateProvider(new PerRequestSpanProcessor(mockExporter)); + + const tracer = provider.getTracer('test'); + + // Keep two traces buffered (root ended but child still open), then attempt a third. + await new Promise((resolve) => { + runWithExportToken('token-1', () => { + const root1 = tracer.startSpan('trace-1', { root: true }); + const ctx1 = trace.setSpan(context.active(), root1); + const child1 = tracer.startSpan('trace-1-child', undefined, ctx1); + root1.end(); + + runWithExportToken('token-2', () => { + const root2 = tracer.startSpan('trace-2', { root: true }); + const ctx2 = trace.setSpan(context.active(), root2); + const child2 = tracer.startSpan('trace-2-child', undefined, ctx2); + root2.end(); + + // This third trace should be dropped because two traces are already buffered. + runWithExportToken('token-3', () => { + const root3 = tracer.startSpan('trace-3', { root: true }); + root3.end(); + }); + + // Finish the buffered traces so they can flush. + setTimeout(() => { + child2.end(); + child1.end(); + setTimeout(resolve, 50); + }, 10); + }); + }); + }); + + const exportedNames = exportedSpans.flatMap((s) => s.map((sp) => sp.name)); + expect(exportedNames).toContain('trace-1'); + expect(exportedNames).toContain('trace-1-child'); + expect(exportedNames).toContain('trace-2'); + expect(exportedNames).toContain('trace-2-child'); + expect(exportedNames).not.toContain('trace-3'); + }); + + it('should cap the number of buffered spans per trace (maxSpansPerTrace)', async () => { + process.env.A365_PER_REQUEST_MAX_SPANS_PER_TRACE = '2'; + await recreateProvider(new PerRequestSpanProcessor(mockExporter)); + + const tracer = provider.getTracer('test'); + + await new Promise((resolve) => { + runWithExportToken('test-token', () => { + const rootSpan = tracer.startSpan('root', { root: true }); + const ctxWithRoot = trace.setSpan(context.active(), rootSpan); + + const child1 = tracer.startSpan('child-1', undefined, ctxWithRoot); + const child2 = tracer.startSpan('child-2', undefined, ctxWithRoot); + child1.end(); + child2.end(); + // Ending root after 2 children makes the drop deterministic: root is the 3rd ended span. + rootSpan.end(); + + setTimeout(resolve, 50); + }); + }); + + // We exported a single trace flush, but only 2 ended spans should be buffered/exported. + expect(exportedSpans.length).toBe(1); + expect(exportedSpans[0].length).toBe(2); + + const exportedNames = exportedSpans[0].map((sp) => sp.name); + expect(exportedNames).toContain('child-1'); + expect(exportedNames).toContain('child-2'); + expect(exportedNames).not.toContain('root'); + }); + + it('should respect max concurrent exports (A365_PER_REQUEST_MAX_CONCURRENT_EXPORTS)', async () => { + process.env.A365_PER_REQUEST_MAX_CONCURRENT_EXPORTS = '2'; + + let inFlight = 0; + let maxInFlight = 0; + + // Hold each export "in flight" for a bit. If concurrency limiting is broken, + // all 3 exports would start immediately and maxInFlight would hit 3. + const exportHoldMs = 50; + + exportedSpans = []; + mockExporter = { + export: (spans: ReadableSpan[], resultCallback: (result: ExportResult) => void) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + exportedSpans.push([...spans]); + + setTimeout(() => { + inFlight -= 1; + resultCallback({ code: ExportResultCode.SUCCESS }); + }, exportHoldMs); + }, + shutdown: async () => { + // No-op: provider shutdown is handled explicitly in afterEach. + } + }; + + await recreateProvider(new PerRequestSpanProcessor(mockExporter)); + + + const tracer = provider.getTracer('test'); + + // Create multiple independent traces that will flush around the same time. + runWithExportToken('token-1', () => { + const span = tracer.startSpan('trace-1'); + span.end(); + }); + runWithExportToken('token-2', () => { + const span = tracer.startSpan('trace-2'); + span.end(); + }); + runWithExportToken('token-3', () => { + const span = tracer.startSpan('trace-3'); + span.end(); + }); + + // Wait long enough for 3 exports to be attempted and completed. + await new Promise((resolve) => setTimeout(resolve, exportHoldMs * 6)); + + expect(maxInFlight).toBeLessThanOrEqual(2); + const exportedNames = exportedSpans.flatMap((s) => s.map((sp) => sp.name)); + expect(exportedNames).toContain('trace-1'); + expect(exportedNames).toContain('trace-2'); + expect(exportedNames).toContain('trace-3'); + }); + + it('should capture root span context and export under that context', async () => { + const tracer = provider.getTracer('test'); + + // Run within a context with a token set + await new Promise((resolve) => { + runWithExportToken('test-token-123', () => { + const rootSpan = tracer.startSpan('root-span'); + rootSpan.end(); + + // Wait a bit for async processing + setTimeout(() => { + resolve(); + }, 100); + }); + }); + + expect(exportedSpans.length).toBeGreaterThan(0); + expect(exportedSpans[0][0].name).toBe('root-span'); + }); + + it('should collect multiple spans from a single trace', async () => { + const tracer = provider.getTracer('test'); + + await new Promise((resolve) => { + runWithExportToken('test-token', () => { + const rootSpan = tracer.startSpan('root-span'); + const child1 = tracer.startSpan('child-1'); + const child2 = tracer.startSpan('child-2'); + + child1.end(); + child2.end(); + rootSpan.end(); + + setTimeout(() => { + resolve(); + }, 100); + }); + }); + + expect(exportedSpans.length).toEqual(3); + // Should have spans from the same trace exported together + const spanNames = exportedSpans.flatMap((s: ReadableSpan[]) => s.map((span) => span.name)); + expect(spanNames).toContain('root-span'); + expect(spanNames).toContain('child-1'); + expect(spanNames).toContain('child-2'); + }); + + it('should handle multiple independent traces', async () => { + const tracer = provider.getTracer('test'); + + await new Promise((resolve) => { + let completed = 0; + const checkDone = () => { + completed++; + if (completed === 3) { + setTimeout(() => { + resolve(); + }, 100); + } + }; + + runWithExportToken('token-1', () => { + const span1 = tracer.startSpan('trace-1-span'); + span1.end(); + checkDone(); + }); + + runWithExportToken('token-2', () => { + const span2 = tracer.startSpan('trace-2-span'); + span2.end(); + checkDone(); + }); + + runWithExportToken('token-3', () => { + const span3 = tracer.startSpan('trace-3-span'); + span3.end(); + checkDone(); + }); + }); + + // Should have collected 3 independent traces + expect(exportedSpans.length).toBeGreaterThanOrEqual(3); + // Verify we have different span names from different traces + const spanNames = exportedSpans.flatMap((spans) => spans.map((s) => s.name)); + expect(spanNames).toContain('trace-1-span'); + expect(spanNames).toContain('trace-2-span'); + expect(spanNames).toContain('trace-3-span'); + }); + + it('should correctly identify root spans', async () => { + const tracer = provider.getTracer('test'); + + await new Promise((resolve) => { + runWithExportToken('test-token', () => { + const rootSpan = tracer.startSpan('actual-root'); + const childSpan = tracer.startSpan('child-of-root'); + const grandchildSpan = tracer.startSpan('grandchild'); + + grandchildSpan.end(); + childSpan.end(); + rootSpan.end(); + + setTimeout(() => { + resolve(); + }, 100); + }); + }); + + expect(exportedSpans.length).toBe(3); + // Verify the order: grandchild, child-of-root, actual-root + expect(exportedSpans[0][0].name).toBe('grandchild'); + expect(exportedSpans[1][0].name).toBe('child-of-root'); + expect(exportedSpans[2][0].name).toBe('actual-root'); + }); + + it('should respect custom grace flush timeout', async () => { + exportedSpans = []; + const customGrace = 30; + await recreateProvider(new PerRequestSpanProcessor(mockExporter, customGrace)); + + const tracer = provider.getTracer('test'); + + await new Promise((resolve) => { + runWithExportToken('test-token', () => { + const rootSpan = tracer.startSpan('root'); + const childSpan = tracer.startSpan('child'); + + rootSpan.end(); // Root ends, child still pending + + setTimeout(() => { + childSpan.end(); // Child ends after grace period should flush + setTimeout(() => { + resolve(); + }, 50); + }, 50); + }); + }); + + expect(exportedSpans.length).toEqual(2); + }); + + it('should handle forceFlush correctly', async () => { + const tracer = provider.getTracer('test'); + + runWithExportToken('test-token', () => { + const rootSpan = tracer.startSpan('root'); + tracer.startSpan('child'); + + rootSpan.end(); // Root ends, child pending + // Don't end child - let forceFlush handle it + }); + + await processor.forceFlush(); + + expect(exportedSpans.length).toBe(1); + }); + + it('should not retain trace buffers after trace completion', async () => { + const tracer = provider.getTracer('test'); + + await new Promise((resolve) => { + runWithExportToken('test-token', () => { + const rootSpan = tracer.startSpan('root'); + const childSpan = tracer.startSpan('child'); + + childSpan.end(); + rootSpan.end(); + + setTimeout(() => resolve(), 100); + }); + }); + + expect(getActiveTraceCount()).toBe(0); + }); + + it('should drop trace buffers after grace flush if children never end', async () => { + exportedSpans = []; + const customGrace = 10; + const customMaxAge = 1000; + + await recreateProvider(new PerRequestSpanProcessor(mockExporter, customGrace, customMaxAge)); + + const tracer = provider.getTracer('test'); + + // Make the sweep deterministic by controlling time and invoking sweep directly. + let now = 1_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => now); + try { + runWithExportToken('test-token', () => { + const rootSpan = tracer.startSpan('root', { root: true }); + const ctxWithRoot = trace.setSpan(context.active(), rootSpan); + + // Start a child span in the same trace and never end it. + // Pass ctxWithRoot explicitly so we don't depend on a global context manager. + tracer.startSpan('child', undefined, ctxWithRoot); + + rootSpan.end(); + }); + + // Should have exactly one trace buffered (root + child share traceId). + expect(getActiveTraceCount()).toBe(1); + + // Validate the trace is in the expected lifecycle state. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const traces: Map = (processor as any).traces; + const buf = [...traces.values()][0]; + expect(buf.rootEnded).toBe(true); + expect(buf.openCount).toBeGreaterThan(0); + expect(buf.rootEndedAtMs).toBeDefined(); + + // Avoid races with the background interval sweeper; drive sweep manually. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const sweepTimer: any = (processor as any).sweepTimer; + if (sweepTimer) { + clearInterval(sweepTimer); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (processor as any).sweepTimer = undefined; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (processor as any).isSweeping = false; + + now += customGrace + 1; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (processor as any).sweep(); + + expect(getActiveTraceCount()).toBe(0); + } finally { + nowSpy.mockRestore(); + } + }); + + it('should drop trace buffers after max trace age even if no spans end (prevents unbounded growth)', async () => { + exportedSpans = []; + const customGrace = 250; + const customMaxAge = 10; + + await recreateProvider(new PerRequestSpanProcessor(mockExporter, customGrace, customMaxAge)); + + const tracer = provider.getTracer('test'); + + let now = 2_000_000; + const nowSpy = jest.spyOn(Date, 'now').mockImplementation(() => now); + try { + runWithExportToken('test-token', () => { + tracer.startSpan('root-never-ended'); + }); + + expect(getActiveTraceCount()).toBe(1); + + now += customMaxAge + 1; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + await (processor as any).sweep(); + + expect(getActiveTraceCount()).toBe(0); + } finally { + nowSpy.mockRestore(); + } + }); + }); +}); diff --git a/tests/observability/core/agent365-exporter.test.ts b/tests/observability/core/agent365-exporter.test.ts index befcab5..858eb67 100644 --- a/tests/observability/core/agent365-exporter.test.ts +++ b/tests/observability/core/agent365-exporter.test.ts @@ -10,6 +10,9 @@ import { Agent365ExporterOptions } from '@microsoft/agents-a365-observability/sr import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; import { ExportResultCode } from '@opentelemetry/core'; import { OpenTelemetryConstants } from '@microsoft/agents-a365-observability/src/tracing/constants'; +import { runWithExportToken } from '@microsoft/agents-a365-observability/src/tracing/context/token-context'; +import { context as otelContext } from '@opentelemetry/api'; +import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks'; // Minimal mock span factory function makeSpan(attrs: Record, name = 'test'): ReadableSpan { @@ -38,6 +41,13 @@ const agentId = 'agent-22222222-2222-2222-2222-222222222222'; // Patch global fetch const originalFetch = global.fetch; +type FetchCallArgs = [string, { headers: Record; body?: unknown }]; + +function getFetchCalls(): FetchCallArgs[] { + const f = global.fetch as unknown as { mock?: { calls?: unknown[][] } }; + return (f.mock?.calls ?? []) as unknown as FetchCallArgs[]; +} + function mockFetchSequence(statuses: number[]): void { let call = 0; global.fetch = jest.fn(async () => ({ @@ -56,6 +66,7 @@ describe('Agent365Exporter', () => { global.fetch = originalFetch; delete process.env.A365_OBSERVABILITY_USE_CUSTOM_DOMAIN; delete process.env.A365_OBSERVABILITY_DOMAIN_OVERRIDE; + delete process.env.ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT; }); it('returns success immediately with no spans', async () => { @@ -89,7 +100,7 @@ describe('Agent365Exporter', () => { await exporter.export(spans, callback); expect(callback).toHaveBeenCalledWith({ code: ExportResultCode.SUCCESS }); // Ensure fetch saw auth header - const fetchCalls = (global.fetch as unknown as { mock: { calls: any[] } }).mock.calls; + const fetchCalls = getFetchCalls(); expect(fetchCalls.length).toBe(1); const headersArg = fetchCalls[0][1].headers; expect(headersArg['authorization']).toBe(`Bearer ${token}`); @@ -123,7 +134,7 @@ describe('Agent365Exporter', () => { const callback = jest.fn(); await exporter.export(spans, callback); expect(callback).toHaveBeenCalledWith({ code: ExportResultCode.SUCCESS }); - const fetchCalls = (global.fetch as unknown as { mock: { calls: any[] } }).mock.calls; + const fetchCalls = getFetchCalls(); expect(fetchCalls.length).toBe(1); const urlArg = fetchCalls[0][0]; const headersArg = fetchCalls[0][1].headers; @@ -184,7 +195,7 @@ describe('Agent365Exporter', () => { await exporter.export(spans, callback); expect(callback).toHaveBeenCalledWith({ code: ExportResultCode.SUCCESS }); - const fetchCalls = (global.fetch as unknown as { mock: { calls: any[] } }).mock.calls; + const fetchCalls = getFetchCalls(); expect(fetchCalls.length).toBe(1); const urlArg = fetchCalls[0][0] as string; const headersArg = fetchCalls[0][1].headers as Record; @@ -215,7 +226,7 @@ describe('Agent365Exporter', () => { const callback = jest.fn(); await exporter.export(spans, callback); expect(callback).toHaveBeenCalledWith({ code: ExportResultCode.SUCCESS }); - const fetchCalls = (global.fetch as unknown as { mock: { calls: any[] } }).mock.calls; + const fetchCalls = getFetchCalls(); expect(fetchCalls.length).toBe(1); const urlArg = fetchCalls[0][0]; const headersArg = fetchCalls[0][1].headers; @@ -251,7 +262,7 @@ describe('Agent365Exporter', () => { await exporter.export(spans, callback); expect(callback).toHaveBeenCalledWith({ code: ExportResultCode.SUCCESS }); - const fetchCalls = (global.fetch as unknown as { mock: { calls: any[] } }).mock.calls; + const fetchCalls = getFetchCalls(); expect(fetchCalls.length).toBe(1); const urlArg = fetchCalls[0][0] as string; @@ -280,12 +291,60 @@ describe('Agent365Exporter', () => { const callback = jest.fn(); await exporter.export(spans, callback); - const fetchCalls = (global.fetch as unknown as { mock: { calls: any[] } }).mock.calls; + const fetchCalls = getFetchCalls(); expect(fetchCalls.length).toBe(1); const urlArg = fetchCalls[0][0] as string; expect(urlArg).toMatch(`/maven/agent365/service/agents/${agentId}/traces?api-version=1`); const headersArg = fetchCalls[0][1].headers as Record; expect(headersArg['authorization']).toBe(`Bearer ${token}`); expect(headersArg['x-ms-tenant-id']).toBe(tenantId); - }) + }); + + + describe('per-request export (token from OTel Context)', () => { + let contextManager: AsyncLocalStorageContextManager | undefined; + + beforeEach(() => { + // Ensure OpenTelemetry context propagates across async/await in this group. + contextManager = new AsyncLocalStorageContextManager(); + contextManager.enable(); + otelContext.setGlobalContextManager(contextManager); + }); + + afterEach(() => { + contextManager?.disable(); + otelContext.disable(); + contextManager = undefined; + }); + + it('acquires export token from OTel Context when per-request export is enabled', async () => { + mockFetchSequence([200]); + process.env.ENABLE_A365_OBSERVABILITY_PER_REQUEST_EXPORT = 'true'; + + const opts = new Agent365ExporterOptions(); + opts.clusterCategory = 'local'; + + const exporter = new Agent365Exporter(opts); + const spans = [ + makeSpan({ + [OpenTelemetryConstants.TENANT_ID_KEY]: tenantId, + [OpenTelemetryConstants.GEN_AI_AGENT_ID_KEY]: agentId + }) + ]; + + const callback = jest.fn(); + const exportToken = 'tok-from-context'; + await runWithExportToken(exportToken, async () => exporter.export(spans, callback)); + + expect(callback).toHaveBeenCalledWith({ code: ExportResultCode.SUCCESS }); + + // Verify export was attempted (should be greater than 0 when enabled) + const fetchCalls = getFetchCalls(); + expect(fetchCalls.length).toBeGreaterThan(0); + + // Verify token came from OTel Context (per-request mode) + const headersArg = fetchCalls[0][1].headers as Record; + expect(headersArg['authorization']).toBe(`Bearer ${exportToken}`); + }); + }); });