Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/llama_stack/core/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

import asyncio
from collections.abc import Coroutine
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any

from opentelemetry import context as otel_context

from llama_stack.core.request_headers import PROVIDER_DATA_VAR


@dataclass
class RequestContext:
"""Snapshot of request-scoped state for propagation through background queues.

Background workers are long-lived asyncio tasks whose contextvars are frozen
at creation time. Capturing both the OTel trace context and the provider /
auth data at *enqueue* time and re-activating them per work-item ensures:

* Each DB write is attributed to the correct request trace (OTel).
* Each DB write is stamped with the correct user identity (PROVIDER_DATA_VAR).
"""

otel_ctx: otel_context.Context
provider_data: Any


def capture_request_context() -> RequestContext:
"""Snapshot the current request-scoped context for later use in a worker."""
return RequestContext(
otel_ctx=otel_context.get_current(),
provider_data=PROVIDER_DATA_VAR.get(),
)


@contextmanager
def activate_request_context(ctx: RequestContext):
"""Temporarily restore a previously captured request context.

Use this in worker loops that run with a detached (empty) context to
attribute work back to the originating request.
"""
otel_token = otel_context.attach(ctx.otel_ctx)
provider_token = PROVIDER_DATA_VAR.set(ctx.provider_data)
try:
yield
finally:
PROVIDER_DATA_VAR.reset(provider_token)
otel_context.detach(otel_token)


def create_detached_background_task(coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
"""Create an asyncio task that does not inherit request-scoped context.

asyncio.create_task copies all contextvars at creation time, which causes
long-lived background workers to permanently inherit the spawning request's
OTel trace and auth identity. This helper temporarily clears both before
creating the task, then immediately restores them so the caller is unaffected.
"""
otel_token = otel_context.attach(otel_context.Context())
provider_token = PROVIDER_DATA_VAR.set(None)
try:
task = asyncio.create_task(coro)
finally:
PROVIDER_DATA_VAR.reset(provider_token)
otel_context.detach(otel_token)
return task
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@

from pydantic import BaseModel, TypeAdapter

<<<<<<< HEAD:src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:7: SyntaxError: Expected a statement

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:5: SyntaxError: Expected a statement

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:3: SyntaxError: Expected a statement

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:1: SyntaxError: Expected a statement

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:7: SyntaxError: Expected a statement

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:5: SyntaxError: Expected a statement

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:3: SyntaxError: Expected a statement

Check failure on line 15 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:15:1: SyntaxError: Expected a statement
=======

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:7: SyntaxError: Expected a statement

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:5: SyntaxError: Expected a statement

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:3: SyntaxError: Expected a statement

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:1: SyntaxError: Expected a statement

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:7: SyntaxError: Expected a statement

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:5: SyntaxError: Expected a statement

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:3: SyntaxError: Expected a statement

Check failure on line 16 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:1: SyntaxError: Expected a statement
from llama_stack.core.conversations.validation import CONVERSATION_ID_PATTERN

Check failure on line 17 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:8: SyntaxError: Expected a statement

Check failure on line 17 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:16:8: SyntaxError: Expected a statement
from llama_stack.core.task import (
RequestContext,
activate_request_context,
capture_request_context,
create_detached_background_task,
)
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227)):src/llama_stack/providers/inline/agents/builtin/responses/openai_responses.py

Check failure on line 24 in src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py

View workflow job for this annotation

GitHub Actions / pre-commit (22)

Ruff

src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py:24:1: SyntaxError: Expected a statement
from llama_stack.log import get_logger
from llama_stack.providers.utils.responses.responses_store import (
ResponsesStore,
Expand Down Expand Up @@ -80,6 +90,17 @@
BACKGROUND_NUM_WORKERS = 10


<<<<<<< HEAD:src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py
=======
@dataclass
class _BackgroundWorkItem:
"""Typed queue item that pairs business kwargs with the originating request context."""

request_context: RequestContext
kwargs: dict = field(default_factory=dict)


>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227)):src/llama_stack/providers/inline/agents/builtin/responses/openai_responses.py
class OpenAIResponsePreviousResponseWithInputItems(BaseModel):
input_items: ListOpenAIResponseInputItem
response: OpenAIResponseObject
Expand Down Expand Up @@ -131,7 +152,11 @@
async def _ensure_workers_started(self) -> None:
"""Start background workers in the current event loop if not already running."""
for _ in range(BACKGROUND_NUM_WORKERS - len(self._background_worker_tasks)):
<<<<<<< HEAD:src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py
task = asyncio.create_task(self._background_worker())
=======
task = create_detached_background_task(self._background_worker())
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227)):src/llama_stack/providers/inline/agents/builtin/responses/openai_responses.py
self._background_worker_tasks.add(task)
task.add_done_callback(self._background_worker_tasks.discard)

Expand All @@ -144,6 +169,7 @@
async def _background_worker(self) -> None:
"""Worker coroutine that pulls items from the queue and processes them."""
while True:
<<<<<<< HEAD:src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py
kwargs = await self._background_queue.get()
try:
await asyncio.wait_for(
Expand All @@ -155,6 +181,10 @@
logger.exception(
f"Background response {response_id} timed out after {BACKGROUND_RESPONSE_TIMEOUT_SECONDS}s"
)
=======
item = await self._background_queue.get()
with activate_request_context(item.request_context):
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227)):src/llama_stack/providers/inline/agents/builtin/responses/openai_responses.py
try:
existing = await self.responses_store.get_response_object(response_id)
existing.status = "failed"
Expand Down Expand Up @@ -812,6 +842,7 @@
# Enqueue work item for background workers. Raises QueueFull if at capacity.
try:
self._background_queue.put_nowait(
<<<<<<< HEAD:src/llama_stack/providers/inline/agents/meta_reference/responses/openai_responses.py
dict(
response_id=response_id,
input=input,
Expand Down Expand Up @@ -839,6 +870,38 @@
truncation=truncation,
presence_penalty=presence_penalty,
extra_body=extra_body,
=======
_BackgroundWorkItem(
request_context=capture_request_context(),
kwargs=dict(
response_id=response_id,
input=input,
model=model,
prompt=prompt,
instructions=instructions,
previous_response_id=previous_response_id,
conversation=conversation,
store=store,
temperature=temperature,
frequency_penalty=frequency_penalty,
text=text,
tool_choice=tool_choice,
tools=tools,
include=include,
max_infer_iters=max_infer_iters,
guardrail_ids=guardrail_ids,
parallel_tool_calls=parallel_tool_calls,
max_tool_calls=max_tool_calls,
reasoning=reasoning,
max_output_tokens=max_output_tokens,
safety_identifier=safety_identifier,
service_tier=service_tier,
metadata=metadata,
truncation=truncation,
presence_penalty=presence_penalty,
extra_body=extra_body,
),
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227)):src/llama_stack/providers/inline/agents/builtin/responses/openai_responses.py
)
)
except asyncio.QueueFull:
Expand Down
31 changes: 31 additions & 0 deletions src/llama_stack/providers/utils/inference/inference_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@
from llama_stack.core.storage.datatypes import InferenceStoreReference, StorageBackendType
from llama_stack.core.storage.sqlstore.authorized_sqlstore import AuthorizedSqlStore
from llama_stack.core.storage.sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
<<<<<<< HEAD
=======
from llama_stack.core.task import (
RequestContext,
activate_request_context,
capture_request_context,
create_detached_background_task,
)
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227))
from llama_stack.log import get_logger
from llama_stack_api import (
ListOpenAIChatCompletionResponse,
Expand All @@ -25,6 +34,15 @@
logger = get_logger(name=__name__, category="inference")


<<<<<<< HEAD
=======
class _WriteItem(NamedTuple):
completion: OpenAIChatCompletion
messages: list[OpenAIMessageParam]
request_context: RequestContext


>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227))
class InferenceStore:
def __init__(
self,
Expand Down Expand Up @@ -100,7 +118,11 @@ async def _ensure_workers_started(self) -> None:
if not self._worker_tasks:
loop = asyncio.get_running_loop()
for _ in range(self._num_writers):
<<<<<<< HEAD
task = loop.create_task(self._worker_loop())
=======
task = create_detached_background_task(self._worker_loop())
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227))
self._worker_tasks.append(task)

async def store_chat_completion(
Expand All @@ -110,6 +132,10 @@ async def store_chat_completion(
await self._ensure_workers_started()
if self._queue is None:
raise ValueError("Inference store is not initialized")
<<<<<<< HEAD
=======
item = _WriteItem(chat_completion, input_messages, capture_request_context())
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227))
try:
self._queue.put_nowait((chat_completion, input_messages))
except asyncio.QueueFull:
Expand All @@ -129,7 +155,12 @@ async def _worker_loop(self) -> None:
break
chat_completion, input_messages = item
try:
<<<<<<< HEAD
await self._write_chat_completion(chat_completion, input_messages)
=======
with activate_request_context(item.request_context):
await self._write_chat_completion(item.completion, item.messages)
>>>>>>> 9b86ce80 (fix: provider_data_var context leak (#5227))
except Exception as e: # noqa: BLE001
logger.error(f"Error writing chat completion: {e}")
finally:
Expand Down
Loading
Loading