Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/winml/modelkit/telemetry/consent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License.
# --------------------------------------------------------------------------

r"""Consent decision for ModelKit telemetry.
r"""Consent decision for WinML CLI telemetry.

A first-run interactive prompt collects user consent (default: accept)
and persists it to ``%USERPROFILE%\.winml\config.json``. This module
Expand Down Expand Up @@ -48,12 +48,12 @@ def _default_config_path() -> Path | None:
_CONSENT_VERSION: int = 1

_PROMPT_TEXT = """\
ModelKit can collect anonymous usage data to help improve the product.
WinML CLI can collect anonymous usage data to help improve the product.
Comment thread
timenick marked this conversation as resolved.

What is collected:
- Command name, duration, success/failure
- Target device/EP (when the command specifies them)
- OS, architecture, ModelKit version
- OS, architecture, WinML CLI version
- Unhandled exception types, code locations, and scrubbed error
messages (paths trimmed, length capped, PII patterns scrubbed)

Expand Down
121 changes: 113 additions & 8 deletions src/winml/modelkit/telemetry/library/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
from __future__ import annotations

import logging
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING

import requests
from opentelemetry.sdk._logs.export import LogRecordExporter, LogRecordExportResult

from .._cache import _PersistentCache
from .serialization import _build_envelope, _serialize_batch
from .serialization import _build_envelope, _envelope_ikey, _serialize_batch


if TYPE_CHECKING:
Expand All @@ -26,6 +27,9 @@
_LOGGER = logging.getLogger(__name__)

_HTTP_TIMEOUT = 10.0
# Truncate response body excerpts in DEBUG logs so a backend that decides
# to dump a large diagnostic payload doesn't flood the log.
_RESPONSE_BODY_LOG_LIMIT = 200


class OneCollectorLogExporter(LogRecordExporter):
Expand All @@ -46,12 +50,26 @@ def __init__(
raise ValueError("ikey must be non-empty")
if not endpoint:
raise ValueError("endpoint must be non-empty")
self._ikey = ikey
# OneCollector requires the envelope's iKey field to be
# ``o:<tenant_token>`` (just the prefix portion of the full ikey),
# while the x-apikey HTTP header carries the full ikey. Compute
# the envelope form once and cache it; a malformed ikey raises
# ValueError, which Telemetry._try_init catches to disable
# telemetry rather than crash the CLI.
self._envelope_ikey = _envelope_ikey(ikey)
# Bare tenant_token (no "o:" prefix) is what the ``kill-tokens``
# response header lists, so we keep it around for membership checks.
self._tenant_token = self._envelope_ikey.removeprefix("o:")
self._endpoint = endpoint
self._cache = cache if cache is not None else _PersistentCache()
# First export() flushes the cache before sending the new batch;
# subsequent exports go straight through.
self._cache_flushed = False
# OneCollector's ``kill-tokens`` directive: when set, our tenant
# is on the backend's deny list and we must stop sending until
# this epoch second. In-memory only -- process restart re-hits
# the 401 once and re-records the kill before short-circuiting.
self._killed_until: float | None = None
# _shutdown is read on the BatchLogRecordProcessor export thread and
# written on the shutdown thread; bool assignment is atomic under the
# CPython GIL, so no lock is needed.
Expand Down Expand Up @@ -82,10 +100,22 @@ def export(self, batch: Sequence[ReadableLogRecord]) -> LogRecordExportResult:
call. Persists the current batch to the cache on POST failure so
the next process can retry — :class:`BatchLogRecordProcessor` only
re-queues in memory and loses the queue on process exit.

While the tenant is under ``kill-tokens``, this is a no-op that
returns ``SUCCESS`` to keep the BatchLogRecordProcessor from
re-queueing events the backend has explicitly told us to stop
sending; envelopes for that window are dropped, not cached.
"""
if self._shutdown or not batch:
return LogRecordExportResult.SUCCESS

if self._is_killed():
_LOGGER.debug(
"telemetry export skipped: tenant under kill-tokens for %.0fs more",
(self._killed_until or 0) - time.time(),
)
return LogRecordExportResult.SUCCESS

try:
envelopes = [self._to_envelope(ld) for ld in batch]
except Exception:
Expand All @@ -94,20 +124,37 @@ def export(self, batch: Sequence[ReadableLogRecord]) -> LogRecordExportResult:

# First-call cache flush: try to send anything left over from a
# previous run. Best-effort, single shot — don't loop if the
# backend is still down.
# backend is still down. If the failure is because we just got
# killed, drop the cached batch instead of looping it forever.
if not self._cache_flushed:
self._cache_flushed = True
cached = self._cache.drain()
if cached and not self._post_envelopes(cached):
if cached and not self._post_envelopes(cached) and not self._is_killed():
self._cache.append(cached)

# If the cache-flush POST just killed us, skip the new-batch POST
# too -- another network round-trip would just re-confirm the
# kill. Drop the envelopes (don't cache, same rationale as the
# top-of-export guard) and return SUCCESS so the
# BatchLogRecordProcessor doesn't re-queue them.
if self._is_killed():
return LogRecordExportResult.SUCCESS

if not self._post_envelopes(envelopes):
Comment thread
timenick marked this conversation as resolved.
self._cache.append(envelopes)
if not self._is_killed():
self._cache.append(envelopes)
return LogRecordExportResult.FAILURE
return LogRecordExportResult.SUCCESS

def _post_envelopes(self, envelopes: list[dict]) -> bool:
"""POST a list of envelopes; return True on 2xx, False otherwise."""
"""POST a list of envelopes; return True on 2xx, False otherwise.

On non-2xx, parses ``kill-tokens`` / ``kill-duration`` to honor the
backend's tenant-level backoff, and emits a DEBUG log line that
captures the ``Collector-Error`` header and a body excerpt — the
two pieces of info the OneCollector backend uses to communicate
the actual rejection reason.
"""
if not envelopes:
return True
try:
Expand All @@ -126,9 +173,43 @@ def _post_envelopes(self, envelopes: list[dict]) -> bool:
return False
if 200 <= response.status_code < 300:
return True
_LOGGER.debug("telemetry backend returned %s", response.status_code)
self._record_kill_if_present(response)
_LOGGER.debug(
"telemetry backend returned %s: error=%r body=%s",
response.status_code,
response.headers.get("Collector-Error"),
(response.text or "")[:_RESPONSE_BODY_LOG_LIMIT].replace("\n", " "),
)
return False

def _is_killed(self) -> bool:
"""True iff our tenant is currently under a ``kill-tokens`` window."""
return self._killed_until is not None and time.time() < self._killed_until

def _record_kill_if_present(self, response: requests.Response) -> None:
"""Honor an inbound ``kill-tokens`` directive that names our tenant.

No-op for any other 4xx/5xx response.
"""
kill_header = response.headers.get("kill-tokens")
duration_header = response.headers.get("kill-duration")
if not kill_header or not duration_header:
return
try:
duration_s = int(duration_header)
except ValueError:
return
if duration_s <= 0:
return
if self._tenant_token not in _parse_kill_tokens(kill_header):
return
self._killed_until = time.time() + duration_s
_LOGGER.debug(
"telemetry tenant under kill-tokens for %ss (until epoch %.0f)",
duration_s,
self._killed_until,
)

def force_flush(self, timeout_millis: int = 30_000) -> bool:
"""No-op: all exports are synchronous."""
return True
Expand All @@ -150,13 +231,37 @@ def _to_envelope(self, ld: ReadableLogRecord) -> dict:
ext = _resource_to_ext(ld.resource)
return _build_envelope(
name=str(record.body),
ikey=self._ikey,
ikey=self._envelope_ikey,
timestamp=timestamp,
data=data,
ext=ext,
)


def _parse_kill_tokens(header_value: str) -> set[str]:
"""Parse the OneCollector ``kill-tokens`` header into a set of tenant_token strings.

The header is a comma-separated list. Each entry is in the form
``o:<tenant_token>`` optionally followed by ``:<reason>`` (e.g.
``o:abc:all`` or ``o:abc:event_name``). We treat any entry naming
a tenant as a full kill for that tenant — per-event kills aren't
something we exploit today.
"""
if not header_value:
return set()
tokens: set[str] = set()
for raw in header_value.split(","):
entry = raw.strip()
if not entry.startswith("o:"):
continue
rest = entry[2:]
# Strip optional ":<reason>" suffix.
tenant = rest.split(":", 1)[0]
if tenant:
tokens.add(tenant)
return tokens


def _ns_to_datetime(ts_ns: int) -> datetime:
return datetime.fromtimestamp(ts_ns / 1_000_000_000, tz=timezone.utc)

Expand Down
27 changes: 26 additions & 1 deletion src/winml/modelkit/telemetry/library/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@
from datetime import datetime


def _envelope_ikey(full_ikey: str) -> str:
"""Derive the envelope ``iKey`` value from a OneCollector instrumentation key.

OneCollector expects two distinct iKey forms on the wire:

- ``x-apikey`` HTTP header: the full instrumentation key
(``<tenant_token>-<guid>-<ingestion_token>``).
- Envelope ``iKey`` field: ``o:<tenant_token>``, where ``tenant_token``
is the portion of the ikey before the first ``-``.

Embedding the full ikey in the envelope's ``iKey`` field is what causes
the backend to reject the request with
``Collector-Error: Invalid Tenant Token``.
"""
dash = full_ikey.find("-")
if dash <= 0:
raise ValueError(
"OneCollector instrumentation key must contain a non-empty "
"tenant_token portion before the first '-'"
)
return f"o:{full_ikey[:dash]}"


def _build_envelope(
name: str,
ikey: str,
Expand All @@ -28,7 +51,9 @@ def _build_envelope(
ver: schema version, always "4.0"
name: event name (e.g. "ModelKitAction")
time: ISO8601 UTC, millisecond precision, trailing Z
iKey: OneCollector InstrumentationKey (in "o:<tenant-token>" form)
iKey: envelope iKey value -- caller is responsible for supplying
it in the ``o:<tenant_token>`` form (use
:func:`_envelope_ikey` to derive it from the full ikey)
data: event-specific flat payload
ext: common context slots (os, app, device)
"""
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/telemetry/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def enabled_telemetry(monkeypatch, isolated_config, clean_env):
ready instance should use :func:`running_telemetry`, or call
``Telemetry.get_or_init()`` from inside the test body.
"""
monkeypatch.setattr("winml.modelkit.telemetry.constants.INSTRUMENTATION_KEY", "o:test-key")
monkeypatch.setattr(
"winml.modelkit.telemetry.constants.INSTRUMENTATION_KEY", "test-tenant-1234"
)
consent_mod._write_stored_consent("enabled")
monkeypatch.setattr("sys.stdin.isatty", lambda: True)

Expand Down
Loading
Loading