Skip to content

ref(replays): Remove dead code and reorganize ingest-replay-recordings consumer #95030

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
105 changes: 97 additions & 8 deletions src/sentry/replays/consumers/recording.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,41 @@
import logging
import zlib
from collections.abc import Mapping
from typing import cast

import sentry_sdk
import sentry_sdk.scope
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import RunTask, RunTaskInThreads
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.types import Commit, FilteredPayload, Message, Partition
from django.conf import settings
from sentry_kafka_schemas.codecs import Codec, ValidationError
from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
from sentry_sdk import set_tag

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.filestore.gcs import GCS_RETRYABLE_ERRORS
from sentry.replays.usecases.ingest import (
DropSilently,
ProcessedRecordingMessage,
Event,
HaltIngestion,
ProcessedEvent,
commit_recording_message,
parse_recording_message,
process_recording_message,
process_recording_event,
track_recording_metadata,
)
from sentry.utils import json

RECORDINGS_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS)

logger = logging.getLogger(__name__)


class DropSilently(Exception):
pass


class ProcessReplayRecordingStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
Expand Down Expand Up @@ -61,7 +75,10 @@ def create_with_partitions(
)


def process_message(message: Message[KafkaPayload]) -> ProcessedRecordingMessage | FilteredPayload:
# Processing Task


def process_message(message: Message[KafkaPayload]) -> ProcessedEvent | FilteredPayload:
with sentry_sdk.start_transaction(
name="replays.consumer.recording_buffered.process_message",
op="replays.consumer.recording_buffered.process_message",
Expand All @@ -70,15 +87,87 @@ def process_message(message: Message[KafkaPayload]) -> ProcessedRecordingMessage
},
):
try:
return process_recording_message(parse_recording_message(message.payload.value))
recording_event = parse_recording_event(message.payload.value)
set_tag("org_id", recording_event["context"]["org_id"])
set_tag("project_id", recording_event["context"]["project_id"])
return process_recording_event(recording_event)
except DropSilently:
return FilteredPayload()
except Exception:
logger.exception("Failed to process replay recording message.")
return FilteredPayload()


def commit_message(message: Message[ProcessedRecordingMessage]) -> None:
@sentry_sdk.trace
def parse_recording_event(message: bytes) -> Event:
recording = parse_request_message(message)
segment_id, payload = parse_headers(cast(bytes, recording["payload"]), recording["replay_id"])
compressed, decompressed = decompress_segment(payload)

replay_event_json = recording.get("replay_event")
if replay_event_json:
replay_event = json.loads(cast(bytes, replay_event_json))
else:
replay_event = None

replay_video_raw = recording.get("replay_video")
if replay_video_raw is not None:
replay_video = cast(bytes, replay_video_raw)
else:
replay_video = None

return {
"context": {
"key_id": recording.get("key_id"),
"org_id": recording["org_id"],
"project_id": recording["project_id"],
"received": recording["received"],
"replay_id": recording["replay_id"],
"retention_days": recording["retention_days"],
"segment_id": segment_id,
},
"payload_compressed": compressed,
"payload": decompressed,
"replay_event": replay_event,
"replay_video": replay_video,
}


@sentry_sdk.trace
def parse_request_message(message: bytes) -> ReplayRecording:
try:
return RECORDINGS_CODEC.decode(message)
except ValidationError:
logger.exception("Could not decode recording message.")
raise DropSilently()


@sentry_sdk.trace
def decompress_segment(segment: bytes) -> tuple[bytes, bytes]:
try:
return (segment, zlib.decompress(segment))
except zlib.error:
if segment and segment[0] == ord("["):
return (zlib.compress(segment), segment)
else:
logger.exception("Invalid recording body.")
raise DropSilently()


@sentry_sdk.trace
def parse_headers(recording: bytes, replay_id: str) -> tuple[int, bytes]:
try:
recording_headers_json, recording_segment = recording.split(b"\n", 1)
return int(json.loads(recording_headers_json)["segment_id"]), recording_segment
except Exception:
logger.exception("Recording headers could not be extracted %s", replay_id)
raise DropSilently()


# I/O Task


def commit_message(message: Message[ProcessedEvent]) -> None:
isolation_scope = sentry_sdk.get_isolation_scope().fork()
with sentry_sdk.scope.use_isolation_scope(isolation_scope):
with sentry_sdk.start_transaction(
Expand All @@ -96,7 +185,7 @@ def commit_message(message: Message[ProcessedRecordingMessage]) -> None:
return None
except GCS_RETRYABLE_ERRORS:
raise
except DropSilently:
except HaltIngestion:
return None
except Exception:
logger.exception("Failed to commit replay recording message.")
Expand Down
Loading
Loading