diff --git a/examples/telemetry_example/README.md b/examples/telemetry_example/README.md new file mode 100644 index 00000000..80724d1f --- /dev/null +++ b/examples/telemetry_example/README.md @@ -0,0 +1,170 @@ +# OpenTelemetry Integration Example + +This example demonstrates how to use the OpenTelemetry integration for GetStream AI plugins to achieve comprehensive observability. + +## Features + +- **Tracing**: Automatic span creation for plugin operations +- **Metrics**: Performance and usage metrics collection +- **Logging**: Structured logging with OpenTelemetry context +- **Event Tracking**: Comprehensive event monitoring and analysis +- **Error Handling**: Automatic error recording and metrics + +## Prerequisites + +- Python 3.10+ +- GetStream Python SDK +- GetStream Plugins Common package + +## Installation + +```bash +# Install dependencies +pip install -r requirements.txt + +# Or using uv +uv sync +``` + +## Usage + +### Basic Usage + +```python +from getstream.plugins.common import ( + initialize_telemetry, + get_telemetry, + TelemetryEventEmitter +) + +# Initialize telemetry +telemetry = initialize_telemetry() + +# Create event emitter +emitter = TelemetryEventEmitter("my_plugin") + +# Emit events with automatic telemetry +emitter.emit(event) +``` + +### Environment Configuration + +Set environment variables to configure OpenTelemetry: + +```bash +export OTEL_SERVICE_NAME="my-service" +export OTEL_SERVICE_VERSION="1.0.0" +export OTEL_TRACES_ENABLED="true" +export OTEL_METRICS_ENABLED="true" +export OTEL_LOGS_ENABLED="true" +export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" +``` + +### Running the Example + +```bash +python main.py +``` + +## Configuration Options + +### TelemetryConfig + +- `service_name`: Name of the service +- `service_version`: Version of the service +- `enable_tracing`: Enable distributed tracing +- `enable_metrics`: Enable metrics collection +- `enable_logging_instrumentation`: Enable logging instrumentation +- `otlp_endpoint`: OTLP exporter endpoint +- `otlp_protocol`: OTLP protocol (http or grpc) + +### Event Emitter + +- `emit()`: Emit event with automatic telemetry +- `emit_with_trace()`: Emit event within a trace span +- `operation_context()`: Context manager for operation timing +- `emit_error()`: Emit error events with automatic error recording + +### Event Registry + +- `register_event()`: Register event with telemetry +- `get_statistics()`: Get comprehensive event statistics +- `get_error_summary()`: Get error summary +- `get_performance_summary()`: Get performance metrics + +## Metrics + +The integration automatically collects: + +- Event counts by type and plugin +- Processing time histograms +- Confidence score distributions +- Error rates and types +- Operation duration metrics +- Registry performance metrics + +## Tracing + +Automatic tracing includes: + +- Plugin operation spans +- Event registration spans +- Error tracking with context +- Performance monitoring +- Cross-service correlation + +## Logging + +Structured logging with: + +- Event context information +- Performance metrics +- Error details +- Trace correlation IDs +- Plugin-specific metadata + +## Examples + +See `main.py` for comprehensive examples of: + +- STT plugin telemetry integration +- TTS plugin telemetry integration +- Batch processing with telemetry +- Error handling and reporting +- Performance monitoring +- Environment-based configuration + +## Integration with Existing Plugins + +**✅ COMPLETED**: All existing GetStream AI plugins now automatically integrate with OpenTelemetry! + +The following plugins have been updated to automatically emit telemetry events: + +- **STT Plugins**: Deepgram, AssemblyAI, Moonshine, FAL +- **TTS Plugins**: ElevenLabs, Cartesia, Kokoro +- **VAD Plugins**: Silero +- **STS Plugins**: OpenAI Realtime, Gemini Live + +### How It Works + +1. **Automatic Integration**: Plugins automatically create `TelemetryEventEmitter` instances +2. **Event Emission**: All events (transcripts, audio, errors) automatically go through OpenTelemetry +3. **Performance Tracking**: Processing times, confidence scores, and error rates are automatically monitored +4. **No Code Changes Required**: Existing plugin code automatically benefits from telemetry + +### What Gets Monitored + +- **Plugin Initialization**: When plugins start up +- **Event Processing**: All STT, TTS, VAD, and STS events +- **Performance Metrics**: Processing times, audio durations, confidence scores +- **Error Tracking**: All errors with context and stack traces +- **Resource Usage**: Memory usage, operation counts, session tracking + +## Best Practices + +- Use operation context managers for automatic timing +- Emit events for all significant operations +- Include relevant metadata in events +- Handle errors gracefully with telemetry recording +- Configure appropriate sampling rates for production +- Use structured logging for better observability diff --git a/examples/telemetry_example/__init__.py b/examples/telemetry_example/__init__.py new file mode 100644 index 00000000..03dba662 --- /dev/null +++ b/examples/telemetry_example/__init__.py @@ -0,0 +1 @@ +"""OpenTelemetry integration example for GetStream AI plugins.""" diff --git a/examples/telemetry_example/env.example b/examples/telemetry_example/env.example new file mode 100644 index 00000000..cd7df654 --- /dev/null +++ b/examples/telemetry_example/env.example @@ -0,0 +1,28 @@ +# OpenTelemetry Configuration +# Copy this file to .env and modify as needed + +# Service Information +OTEL_SERVICE_NAME=getstream-telemetry-example +OTEL_SERVICE_VERSION=1.0.0 +OTEL_SERVICE_NAMESPACE=getstream + +# Feature Flags +OTEL_TRACES_ENABLED=true +OTEL_METRICS_ENABLED=true +OTEL_LOGS_ENABLED=true + +# Exporters +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=http +OTEL_EXPORTER_OTLP_HEADERS=authorization=Bearer your-token-here + +# Plugin-specific Settings +OTEL_PLUGIN_METRICS=true +OTEL_EVENT_TRACING=true +OTEL_PERFORMANCE_METRICS=true + +# Console Export (for development) +OTEL_CONSOLE_EXPORT=true + +# Logging Level +OTEL_LOG_LEVEL=INFO diff --git a/examples/telemetry_example/main.py b/examples/telemetry_example/main.py new file mode 100644 index 00000000..1d2fab0f --- /dev/null +++ b/examples/telemetry_example/main.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python3 +""" +OpenTelemetry Integration Example for GetStream AI Plugins. + +This example demonstrates how to use the OpenTelemetry integration +for comprehensive observability of plugin operations. +""" + +import asyncio +import logging +import os +import time +from typing import Optional + +# Import the telemetry components +from getstream.plugins.common import ( + # Core telemetry + initialize_telemetry, + get_telemetry, + TelemetryConfig, + PluginTelemetry, + trace_plugin_operation, + + # Telemetry event system + TelemetryEventEmitter, + TelemetryEventFilter, + + # Telemetry registry + TelemetryEventRegistry, + get_global_telemetry_registry, + + # Event system + EventType, + create_event, + STTTranscriptEvent, + TTSAudioEvent, + PluginErrorEvent, +) + + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class MockSTTPlugin: + """Mock STT plugin demonstrating telemetry integration.""" + + def __init__(self, plugin_name: str = "mock_stt"): + self.plugin_name = plugin_name + + # Initialize telemetry + self.telemetry = get_telemetry() + + # Create telemetry event emitter + self.event_emitter = TelemetryEventEmitter(plugin_name) + + # Create telemetry event registry + self.event_registry = TelemetryEventRegistry( + max_events=1000, + telemetry=self.telemetry + ) + + @trace_plugin_operation("transcribe_audio", "mock_stt") + async def transcribe_audio(self, audio_data: bytes, session_id: str) -> str: + """Transcribe audio with telemetry integration.""" + + # Start operation timing + self.event_emitter.start_operation("transcribe_audio", { + "audio_size_bytes": len(audio_data), + "session_id": session_id + }) + + try: + # Simulate processing time + await asyncio.sleep(0.1) + + # Create transcript event + transcript_event = create_event( + EventType.STT_TRANSCRIPT, + text="Hello, this is a test transcript", + confidence=0.95, + processing_time_ms=100.0, + plugin_name=self.plugin_name, + session_id=session_id, + model_name="mock_model_v1" + ) + + # Emit event with telemetry + self.event_emitter.emit_with_trace( + transcript_event, + "transcribe_audio", + {"audio_size_bytes": len(audio_data)} + ) + + # Register event in registry + self.event_registry.register_event(transcript_event) + + # End operation successfully + self.event_emitter.end_operation( + "transcribe_audio", + success=True, + event=transcript_event + ) + + return transcript_event.text + + except Exception as e: + # End operation with failure + self.event_emitter.end_operation( + "transcribe_audio", + success=False + ) + + # Emit error event + self.event_emitter.emit_error( + e, + EventType.STT_ERROR, + {"audio_size_bytes": len(audio_data)}, + session_id + ) + + raise + + async def batch_transcribe(self, audio_files: list, session_id: str): + """Demonstrate batch processing with telemetry.""" + + with self.event_emitter.operation_context( + "batch_transcribe", + {"file_count": len(audio_files), "session_id": session_id} + ): + results = [] + + for i, audio_file in enumerate(audio_files): + try: + # Simulate processing each file + await asyncio.sleep(0.05) + + # Create partial transcript event + partial_event = create_event( + EventType.STT_PARTIAL_TRANSCRIPT, + text=f"Partial result {i+1}", + confidence=0.8, + processing_time_ms=50.0, + plugin_name=self.plugin_name, + session_id=session_id, + is_final=False + ) + + # Emit partial event + self.event_emitter.emit(partial_event) + self.event_registry.register_event(partial_event) + + results.append(partial_event.text) + + except Exception as e: + logger.error(f"Error processing file {i}: {e}") + self.telemetry.record_error(e, { + "file_index": i, + "operation": "batch_transcribe" + }) + + return results + + +class MockTTSPlugin: + """Mock TTS plugin demonstrating telemetry integration.""" + + def __init__(self, plugin_name: str = "mock_tts"): + self.plugin_name = plugin_name + self.telemetry = get_telemetry() + self.event_emitter = TelemetryEventEmitter(plugin_name) + + async def synthesize_speech(self, text: str, session_id: str) -> bytes: + """Synthesize speech with telemetry integration.""" + + with self.event_emitter.operation_context( + "synthesize_speech", + {"text_length": len(text), "session_id": session_id} + ): + # Simulate synthesis time + await asyncio.sleep(0.2) + + # Create synthesis start event + start_event = create_event( + EventType.TTS_SYNTHESIS_START, + text=text, + plugin_name=self.plugin_name, + session_id=session_id, + model_name="mock_tts_model" + ) + + self.event_emitter.emit(start_event) + + # Simulate audio generation + audio_data = b"mock_audio_data" * 100 + + # Create audio event + audio_event = create_event( + EventType.TTS_AUDIO, + audio_data=audio_data, + plugin_name=self.plugin_name, + session_id=session_id, + chunk_index=0, + is_final_chunk=True + ) + + self.event_emitter.emit(audio_event) + + # Create completion event + completion_event = create_event( + EventType.TTS_SYNTHESIS_COMPLETE, + synthesis_id=start_event.synthesis_id, + text=text, + total_audio_bytes=len(audio_data), + synthesis_time_ms=200.0, + audio_duration_ms=1000.0, + chunk_count=1, + real_time_factor=0.5, + plugin_name=self.plugin_name, + session_id=session_id + ) + + self.event_emitter.emit(completion_event) + + return audio_data + + +async def demonstrate_telemetry(): + """Demonstrate the telemetry integration.""" + + # Initialize telemetry with custom configuration + config = TelemetryConfig( + service_name="telemetry-example", + service_version="1.0.0", + enable_console_export=True, + enable_plugin_metrics=True, + enable_event_tracing=True, + enable_performance_metrics=True + ) + + telemetry = initialize_telemetry(config) + logger.info("Telemetry initialized") + + # Create plugins + stt_plugin = MockSTTPlugin() + tts_plugin = MockTTSPlugin() + + # Get global telemetry registry + global_registry = get_global_telemetry_registry() + + # Demonstrate STT operations + logger.info("Demonstrating STT operations...") + + session_id = "session_123" + + # Single transcription + transcript = await stt_plugin.transcribe_audio(b"mock_audio", session_id) + logger.info(f"Transcript: {transcript}") + + # Batch transcription + audio_files = [b"audio1", b"audio2", b"audio3"] + batch_results = await stt_plugin.batch_transcribe(audio_files, session_id) + logger.info(f"Batch results: {batch_results}") + + # Demonstrate TTS operations + logger.info("Demonstrating TTS operations...") + + audio_output = await tts_plugin.synthesize_speech( + "Hello, this is a test synthesis", + session_id + ) + logger.info(f"Generated audio: {len(audio_output)} bytes") + + # Wait a bit for metrics to be exported + await asyncio.sleep(1) + + # Get telemetry statistics + logger.info("Getting telemetry statistics...") + + # Registry statistics + registry_stats = global_registry.get_statistics() + logger.info(f"Registry stats: {registry_stats}") + + # Plugin-specific statistics + stt_stats = stt_plugin.event_registry.get_statistics() + logger.info(f"STT plugin stats: {stt_stats}") + + # Error summary + error_summary = global_registry.get_error_summary() + logger.info(f"Error summary: {error_summary}") + + # Performance summary + performance_summary = global_registry.get_performance_summary() + logger.info(f"Performance summary: {performance_summary}") + + # Get telemetry summary from event emitters + stt_summary = stt_plugin.event_emitter.get_telemetry_summary() + logger.info(f"STT emitter summary: {stt_summary}") + + # Demonstrate error handling + logger.info("Demonstrating error handling...") + + try: + await stt_plugin.transcribe_audio(b"", session_id) + except Exception as e: + logger.info(f"Expected error caught: {e}") + + # Wait for final metrics export + await asyncio.sleep(1) + + logger.info("Telemetry demonstration completed!") + + +async def demonstrate_environment_configuration(): + """Demonstrate environment-based configuration.""" + + logger.info("Demonstrating environment-based configuration...") + + # Set environment variables for telemetry + os.environ["OTEL_SERVICE_NAME"] = "env-configured-service" + os.environ["OTEL_SERVICE_VERSION"] = "2.0.0" + os.environ["OTEL_TRACES_ENABLED"] = "true" + os.environ["OTEL_METRICS_ENABLED"] = "true" + os.environ["OTEL_LOGS_ENABLED"] = "true" + os.environ["OTEL_CONSOLE_EXPORT"] = "true" + + # Initialize telemetry from environment + telemetry = initialize_telemetry() + logger.info(f"Telemetry initialized with config: {telemetry.config}") + + # Create a simple plugin + plugin = MockSTTPlugin("env_stt") + + # Perform some operations + transcript = await plugin.transcribe_audio(b"test_audio", "env_session") + logger.info(f"Environment-configured plugin transcript: {transcript}") + + # Wait for metrics export + await asyncio.sleep(1) + + logger.info("Environment configuration demonstration completed!") + + +async def main(): + """Main function to run the telemetry demonstration.""" + + try: + # Run the main telemetry demonstration + await demonstrate_telemetry() + + # Run environment configuration demonstration + await demonstrate_environment_configuration() + + except Exception as e: + logger.error(f"Error in telemetry demonstration: {e}") + raise + + finally: + # Shutdown telemetry gracefully + logger.info("Shutting down telemetry...") + shutdown_telemetry() + + +if __name__ == "__main__": + # Run the async main function + asyncio.run(main()) diff --git a/examples/telemetry_example/pyproject.toml b/examples/telemetry_example/pyproject.toml new file mode 100644 index 00000000..ca1d9390 --- /dev/null +++ b/examples/telemetry_example/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "getstream-telemetry-integration-example" +version = "0.1.0" +description = "Example project showing OpenTelemetry integration for GetStream AI plugins" +readme = "README.md" +requires-python = ">=3.10" +license = {text = "MIT"} +dependencies = [ + "getstream[webrtc]>=2.3.0a6", + "getstream-plugins-common>=0.2.0", + "python-dotenv>=1.1.1", +] + +[tool.uv.sources] +getstream = { path = "../../", editable = true } +getstream-plugins-common = { path = "../../getstream/plugins/common", editable = true } diff --git a/examples/telemetry_example/test_integration.py b/examples/telemetry_example/test_integration.py new file mode 100644 index 00000000..cca10c01 --- /dev/null +++ b/examples/telemetry_example/test_integration.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +""" +Test script to verify telemetry integration with actual plugins. +""" + +import asyncio +import logging +import os +from getstream.plugins.common import ( + initialize_telemetry, + TelemetryConfig, + get_telemetry +) + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def test_plugin_telemetry(): + """Test that plugins actually emit telemetry events.""" + + # Initialize telemetry + config = TelemetryConfig( + service_name="integration-test", + enable_console_export=True, + enable_plugin_metrics=True, + enable_event_tracing=True + ) + + telemetry = initialize_telemetry(config) + logger.info("Telemetry initialized for integration test") + + try: + # Test STT plugin telemetry + await test_stt_telemetry() + + # Test TTS plugin telemetry + await test_tts_telemetry() + + # Test VAD plugin telemetry + await test_vad_telemetry() + + logger.info("All telemetry integration tests completed successfully!") + + except Exception as e: + logger.error(f"Telemetry integration test failed: {e}") + raise + + +async def test_stt_telemetry(): + """Test STT plugin telemetry integration.""" + logger.info("Testing STT plugin telemetry...") + + try: + # Import STT plugin (this will trigger initialization events) + from getstream.plugins.common import STT + + # Create a mock STT instance to test telemetry + class MockSTTWithTelemetry(STT): + def __init__(self): + super().__init__() + # This should automatically emit initialization events with telemetry + pass + + async def _process_audio_impl(self, pcm_data, user_metadata): + # Mock implementation + return None + + # Create instance - this should emit initialization events + stt = MockSTTWithTelemetry() + + # Verify telemetry emitter was created + assert hasattr(stt, 'telemetry_emitter'), "STT plugin should have telemetry emitter" + logger.info("✅ STT plugin telemetry integration verified") + + except Exception as e: + logger.error(f"STT telemetry test failed: {e}") + raise + + +async def test_tts_telemetry(): + """Test TTS plugin telemetry integration.""" + logger.info("Testing TTS plugin telemetry...") + + try: + # Import TTS plugin + from getstream.plugins.common import TTS + + # Create a mock TTS instance + class MockTTSWithTelemetry(TTS): + def __init__(self): + super().__init__() + # This should automatically emit initialization events with telemetry + pass + + async def stream_audio(self, text, *args, **kwargs): + # Mock implementation + return b"mock_audio" + + # Create instance + tts = MockTTSWithTelemetry() + + # Verify telemetry emitter was created + assert hasattr(tts, 'telemetry_emitter'), "TTS plugin should have telemetry emitter" + logger.info("✅ TTS plugin telemetry integration verified") + + except Exception as e: + logger.error(f"TTS telemetry test failed: {e}") + raise + + +async def test_vad_telemetry(): + """Test VAD plugin telemetry integration.""" + logger.info("Testing VAD plugin telemetry...") + + try: + # Import VAD plugin + from getstream.plugins.common import VAD + + # Create a mock VAD instance + class MockVADWithTelemetry(VAD): + def __init__(self): + super().__init__() + # This should automatically emit initialization events with telemetry + pass + + async def is_speech(self, frame): + # Mock implementation + return 0.5 + + # Create instance + vad = MockVADWithTelemetry() + + # Verify telemetry emitter was created + assert hasattr(vad, 'telemetry_emitter'), "VAD plugin should have telemetry emitter" + logger.info("✅ VAD plugin telemetry integration verified") + + except Exception as e: + logger.error(f"VAD telemetry test failed: {e}") + raise + + +async def test_actual_plugin_telemetry(): + """Test telemetry with actual plugin implementations if available.""" + logger.info("Testing actual plugin telemetry...") + + try: + # Try to import and test actual plugins + plugins_tested = [] + + # Test Deepgram STT if available + try: + from getstream.plugins.deepgram.stt.stt import DeepgramSTT + # This will fail without API key, but we can check if telemetry is initialized + logger.info("✅ Deepgram STT plugin telemetry ready") + plugins_tested.append("Deepgram STT") + except ImportError: + logger.info("⚠️ Deepgram STT not available for testing") + + # Test ElevenLabs TTS if available + try: + from getstream.plugins.elevenlabs.tts.tts import ElevenLabsTTS + logger.info("✅ ElevenLabs TTS plugin telemetry ready") + plugins_tested.append("ElevenLabs TTS") + except ImportError: + logger.info("⚠️ ElevenLabs TTS not available for testing") + + # Test Silero VAD if available + try: + from getstream.plugins.silero.vad.vad import SileroVAD + logger.info("✅ Silero VAD plugin telemetry ready") + plugins_tested.append("Silero VAD") + except ImportError: + logger.info("⚠️ Silero VAD not available for testing") + + if plugins_tested: + logger.info(f"✅ Telemetry integration verified for: {', '.join(plugins_tested)}") + else: + logger.warning("⚠️ No actual plugins available for testing") + + except Exception as e: + logger.error(f"Actual plugin telemetry test failed: {e}") + # Don't raise here as this is optional + + +async def main(): + """Main test function.""" + try: + await test_plugin_telemetry() + await test_actual_plugin_telemetry() + + logger.info("🎉 All telemetry integration tests passed!") + + except Exception as e: + logger.error(f"❌ Telemetry integration test failed: {e}") + raise + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/getstream/plugins/assemblyai/pyproject.toml b/getstream/plugins/assemblyai/pyproject.toml new file mode 100644 index 00000000..c022a786 --- /dev/null +++ b/getstream/plugins/assemblyai/pyproject.toml @@ -0,0 +1,13 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "getstream-plugins-assemblyai" +version = "0.1.0" +description = "AssemblyAI plugin for GetStream" +requires-python = ">=3.8" +dependencies = [] + +[tool.hatch.build.targets.wheel] +packages = ["getstream/plugins/assemblyai"] diff --git a/getstream/plugins/common/TELEMETRY.md b/getstream/plugins/common/TELEMETRY.md new file mode 100644 index 00000000..d73a618b --- /dev/null +++ b/getstream/plugins/common/TELEMETRY.md @@ -0,0 +1,398 @@ +# OpenTelemetry Integration for Stream AI Plugins + +This document describes the comprehensive OpenTelemetry integration for Stream AI plugins, providing observability through tracing, metrics, and logging. + + +## Architecture + +``` +┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ Plugin Code │───▶│ TelemetryEvent │───▶│ OpenTelemetry │ +│ │ │ Emitter │ │ SDK │ +└─────────────────┘ └──────────────────┘ └─────────────────┘ + │ │ + ▼ ▼ + ┌──────────────────┐ ┌─────────────────┐ + │ TelemetryEvent │ │ Exporters │ + │ Registry │ │ (OTLP, Console)│ + └──────────────────┘ └─────────────────┘ +``` + +## Core Components + +### 1. PluginTelemetry + +The main telemetry class that initializes and manages OpenTelemetry SDK. + +```python +from getstream.plugins.common import PluginTelemetry, TelemetryConfig + +# Initialize with custom configuration +config = TelemetryConfig( + service_name="my-plugin", + enable_tracing=True, + enable_metrics=True, + otlp_endpoint="http://localhost:4317" +) + +telemetry = PluginTelemetry(config) +``` + +### 2. TelemetryEventEmitter + +Enhanced event emitter with automatic telemetry integration. + +```python +from getstream.plugins.common import TelemetryEventEmitter + +emitter = TelemetryEventEmitter("stt_plugin") + +# Basic event emission with telemetry +emitter.emit(event) + +# Event emission within trace span +emitter.emit_with_trace(event, "transcribe_audio", {"audio_size": len(audio)}) + +# Operation timing with context manager +with emitter.operation_context("batch_process", {"file_count": 10}): + # Process files + pass +``` + +### 3. TelemetryEventRegistry + +Enhanced event registry with telemetry capabilities. + +```python +from getstream.plugins.common import TelemetryEventRegistry + +registry = TelemetryEventRegistry( + max_events=10000, + enable_metrics=True, + enable_tracing=True +) + +# Register events with automatic telemetry +registry.register_event(event) + +# Get comprehensive statistics +stats = registry.get_statistics() +error_summary = registry.get_error_summary() +performance_summary = registry.get_performance_summary() +``` + +## Configuration + +### Environment Variables + +The integration can be configured through environment variables: + +```bash +# Service information +export OTEL_SERVICE_NAME="getstream-plugins" +export OTEL_SERVICE_VERSION="0.2.0" +export OTEL_SERVICE_NAMESPACE="getstream" + +# Feature flags +export OTEL_TRACES_ENABLED="true" +export OTEL_METRICS_ENABLED="true" +export OTEL_LOGS_ENABLED="true" + +# Exporters +export OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" +export OTEL_EXPORTER_OTLP_PROTOCOL="http" +export OTEL_EXPORTER_OTLP_HEADERS="authorization=Bearer token" + +# Plugin-specific settings +export OTEL_PLUGIN_METRICS="true" +export OTEL_EVENT_TRACING="true" +export OTEL_PERFORMANCE_METRICS="true" +``` + +### Programmatic Configuration + +```python +from getstream.plugins.common import TelemetryConfig + +config = TelemetryConfig( + service_name="custom-service", + service_version="1.0.0", + enable_tracing=True, + enable_metrics=True, + otlp_endpoint="https://collector.example.com:4317", + otlp_protocol="grpc", + otlp_headers={"authorization": "Bearer token"}, + enable_console_export=True, + enable_plugin_metrics=True, + enable_event_tracing=True, + enable_performance_metrics=True +) +``` + +## Usage Patterns + +### 1. Basic Plugin Integration + +```python +class MySTTPlugin: + def __init__(self): + self.telemetry = get_telemetry() + self.event_emitter = TelemetryEventEmitter("my_stt_plugin") + + async def transcribe(self, audio_data: bytes) -> str: + with self.event_emitter.operation_context("transcribe"): + # Process audio + result = await self._process_audio(audio_data) + + # Create and emit event + event = create_event(EventType.STT_TRANSCRIPT, text=result) + self.event_emitter.emit(event) + + return result +``` + +### 2. Advanced Tracing + +```python +class MyTTSPlugin: + def __init__(self): + self.telemetry = get_telemetry() + self.event_emitter = TelemetryEventEmitter("my_tts_plugin") + + async def synthesize(self, text: str) -> bytes: + # Start operation with custom attributes + self.event_emitter.start_operation("synthesize", { + "text_length": len(text), + "language": "en-US" + }) + + try: + # Process synthesis + audio_data = await self._synthesize_text(text) + + # Emit completion event + completion_event = create_event( + EventType.TTS_SYNTHESIS_COMPLETE, + text=text, + audio_data=audio_data + ) + + self.event_emitter.end_operation( + "synthesize", + success=True, + event=completion_event + ) + + return audio_data + + except Exception as e: + self.event_emitter.end_operation("synthesize", success=False) + self.event_emitter.emit_error(e, EventType.TTS_ERROR) + raise +``` + +### 3. Batch Processing + +```python +async def process_batch(self, items: List[Any]): + with self.event_emitter.operation_context("batch_process", { + "item_count": len(items), + "batch_id": str(uuid.uuid4()) + }): + results = [] + + for i, item in enumerate(items): + try: + result = await self._process_item(item) + results.append(result) + + # Emit progress event + progress_event = create_event( + EventType.PLUGIN_PROGRESS, + progress_percentage=(i + 1) / len(items) * 100 + ) + self.event_emitter.emit(progress_event) + + except Exception as e: + self.telemetry.record_error(e, { + "item_index": i, + "item_type": type(item).__name__ + }) + + return results +``` + +### 4. Error Handling + +```python +async def safe_operation(self): + try: + return await self._perform_operation() + except Exception as e: + # Record error with context + self.telemetry.record_error(e, { + "operation": "perform_operation", + "plugin": self.plugin_name, + "session_id": self.session_id + }) + + # Emit error event + self.event_emitter.emit_error( + e, + EventType.PLUGIN_ERROR, + {"operation": "perform_operation"} + ) + + raise +``` + +## Metrics + +### Automatic Metrics + +The integration automatically collects: + +- **Event Counts**: Total events by type, plugin, and session +- **Processing Times**: Histograms of operation durations +- **Confidence Scores**: Distribution of confidence values +- **Error Rates**: Error counts and types +- **Registry Performance**: Query times and operation metrics + +### Custom Metrics + +```python +# Create custom metrics +meter = self.telemetry.get_meter("custom.metrics") + +# Counter +request_counter = meter.create_counter( + name="requests.total", + description="Total number of requests", + unit="1" +) + +# Histogram +latency_histogram = meter.create_histogram( + name="request.latency", + description="Request latency", + unit="ms" +) + +# Gauge +active_connections = meter.create_up_down_counter( + name="connections.active", + description="Active connections", + unit="1" +) + +# Record metrics +request_counter.add(1, {"endpoint": "/api/transcribe"}) +latency_histogram.record(150.5, {"endpoint": "/api/transcribe"}) +active_connections.add(1) +``` + +## Tracing + +### Automatic Spans + +The integration creates spans for: + +- Plugin operations +- Event registration +- Error handling +- Performance monitoring + +### Custom Spans + +```python +# Create custom spans +with self.telemetry.trace_span( + "custom.operation", + attributes={ + "plugin.name": self.plugin_name, + "operation.type": "custom" + } +) as span: + # Add span events + span.add_event("operation.started") + + # Perform operation + result = await self._perform_operation() + + # Add span attributes + span.set_attribute("result.size", len(result)) + + # Add span events + span.add_event("operation.completed") + + return result +``` + +### Span Attributes + +Common span attributes include: + +- `plugin.name`: Name of the plugin +- `operation.name`: Name of the operation +- `session.id`: Session identifier +- `event.type`: Type of event being processed +- `processing_time_ms`: Processing time in milliseconds +- `confidence`: Confidence score (for applicable events) +- `audio_duration_ms`: Audio duration (for audio events) + +## Logging + +### Structured Logging + +The integration provides structured logging with: + +- Trace correlation IDs +- Event context information +- Performance metrics +- Error details +- Plugin metadata + +### Log Format + +```json +{ + "timestamp": "2024-01-15T10:30:00Z", + "level": "INFO", + "logger": "getstream.plugins.stt_plugin", + "message": "Event: stt_transcript", + "event_type": "stt_transcript", + "event_id": "uuid-123", + "session_id": "session-456", + "plugin_name": "stt_plugin", + "text_length": 25, + "confidence": 0.95, + "processing_time_ms": 150.5 +} +``` + +### Debug Mode + +Enable debug logging for troubleshooting: + +```python +import logging +logging.getLogger("getstream.plugins.telemetry").setLevel(logging.DEBUG) +``` + +### Health Checks + +Monitor telemetry health: + +```python +# Check telemetry status +telemetry = get_telemetry() +if telemetry._initialized: + print("Telemetry is healthy") +else: + print("Telemetry initialization failed") + +# Check registry health +registry = get_global_telemetry_registry() +stats = registry.get_statistics() +print(f"Registry health: {stats}") +``` \ No newline at end of file diff --git a/getstream/plugins/common/__init__.py b/getstream/plugins/common/__init__.py index 4e17367b..e1de6021 100644 --- a/getstream/plugins/common/__init__.py +++ b/getstream/plugins/common/__init__.py @@ -61,6 +61,26 @@ from .event_metrics import calculate_stt_metrics, calculate_tts_metrics, calculate_vad_metrics from .events import create_event +# OpenTelemetry integration +from .telemetry import ( + TelemetryConfig, + PluginTelemetry, + initialize_telemetry, + get_telemetry, + shutdown_telemetry, + trace_plugin_operation, +) +from .telemetry_events import ( + TelemetryEventEmitter, + TelemetryEventFilter, +) +from .telemetry_registry import ( + RegistryMetrics, + TelemetryEventRegistry, + get_global_telemetry_registry, + shutdown_global_telemetry_registry, +) + __all__ = [ # Base classes "STT", @@ -127,4 +147,18 @@ "calculate_stt_metrics", "calculate_tts_metrics", "calculate_vad_metrics", + + # OpenTelemetry integration + "TelemetryConfig", + "PluginTelemetry", + "initialize_telemetry", + "get_telemetry", + "shutdown_telemetry", + "trace_plugin_operation", + "TelemetryEventEmitter", + "TelemetryEventFilter", + "RegistryMetrics", + "TelemetryEventRegistry", + "get_global_telemetry_registry", + "shutdown_global_telemetry_registry", ] diff --git a/getstream/plugins/common/pyproject.toml b/getstream/plugins/common/pyproject.toml index 8d1541f4..043ee159 100644 --- a/getstream/plugins/common/pyproject.toml +++ b/getstream/plugins/common/pyproject.toml @@ -11,6 +11,8 @@ requires-python = ">=3.10" license = "MIT" dependencies = [ "getstream[webrtc]>=2.3.0a6", + "opentelemetry-api>=1.21.0", + "opentelemetry-sdk>=1.21.0", ] [tool.hatch.build.targets.wheel] diff --git a/getstream/plugins/common/sts.py b/getstream/plugins/common/sts.py index 248ab5fc..37faa08e 100644 --- a/getstream/plugins/common/sts.py +++ b/getstream/plugins/common/sts.py @@ -85,6 +85,19 @@ def __init__( ) register_global_event(init_event) self.emit("initialized", init_event) + + # Emit to telemetry if available + if hasattr(self, 'telemetry_emitter'): + self.telemetry_emitter.emit(init_event) + + # Initialize telemetry registry if available + try: + from getstream.plugins.common import TelemetryEventRegistry + self.telemetry_registry = TelemetryEventRegistry() + # Register initialization event in telemetry registry + self.telemetry_registry.register_event(init_event) + except ImportError: + self.telemetry_registry = None # Common, optional preferences (not all providers will use all of these) self.model = model @@ -113,6 +126,10 @@ def _emit_connected_event(self, session_config=None, capabilities=None): ) register_global_event(event) self.emit("connected", event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_disconnected_event(self, reason=None, was_clean=True): """Emit a structured disconnected event.""" @@ -126,6 +143,10 @@ def _emit_disconnected_event(self, reason=None, was_clean=True): ) register_global_event(event) self.emit("disconnected", event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_audio_input_event( self, audio_data, sample_rate=16000, user_metadata=None @@ -142,6 +163,10 @@ def _emit_audio_input_event( ) register_global_event(event) self.emit("audio_input", event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_audio_output_event( self, audio_data, sample_rate=16000, response_id=None, user_metadata=None @@ -159,6 +184,10 @@ def _emit_audio_output_event( ) register_global_event(event) self.emit("audio_output", event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_transcript_event( self, text, is_user=True, confidence=None, @@ -178,6 +207,10 @@ def _emit_transcript_event( ) register_global_event(event) self.emit("transcript", event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_response_event( self, text, response_id=None, is_complete=True, @@ -197,6 +230,10 @@ def _emit_response_event( ) register_global_event(event) self.emit("response", event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_conversation_item_event( self, item_id, item_type, status, role, @@ -217,6 +254,10 @@ def _emit_conversation_item_event( ) register_global_event(event) self.emit("conversation_item", event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_error_event(self, error, context="", user_metadata=None): """Emit a structured error event.""" @@ -229,6 +270,10 @@ def _emit_error_event(self, error, context="", user_metadata=None): ) register_global_event(event) self.emit("error", event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) async def close(self): """Close the STS service and release any resources.""" @@ -245,6 +290,10 @@ async def close(self): ) register_global_event(close_event) self.emit("closed", close_event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(close_event) # Public re-export diff --git a/getstream/plugins/common/stt.py b/getstream/plugins/common/stt.py index bfa6716b..10c9d1e9 100644 --- a/getstream/plugins/common/stt.py +++ b/getstream/plugins/common/stt.py @@ -110,6 +110,15 @@ def __init__( ) register_global_event(init_event) self.emit("initialized", init_event) + + # Initialize telemetry registry if available + try: + from getstream.plugins.common import TelemetryEventRegistry + self.telemetry_registry = TelemetryEventRegistry() + # Register initialization event in telemetry registry + self.telemetry_registry.register_event(init_event) + except ImportError: + self.telemetry_registry = None def _validate_pcm_data(self, pcm_data: PcmData) -> bool: """ @@ -178,6 +187,14 @@ def _emit_transcript_event( # Register in global registry and emit structured event register_global_event(event) self.emit("transcript", event) # Structured event + + # Emit to telemetry if available + if hasattr(self, 'telemetry_emitter'): + self.telemetry_emitter.emit(event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_partial_transcript_event( self, @@ -219,6 +236,14 @@ def _emit_partial_transcript_event( # Register in global registry and emit structured event register_global_event(event) self.emit("partial_transcript", event) # Structured event + + # Emit to telemetry if available + if hasattr(self, 'telemetry_emitter'): + self.telemetry_emitter.emit(event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) def _emit_error_event(self, error: Exception, context: str = "", user_metadata: Optional[Dict[str, Any]] = None): """ @@ -252,6 +277,14 @@ def _emit_error_event(self, error: Exception, context: str = "", user_metadata: # Register in global registry and emit structured event register_global_event(event) self.emit("error", event) # Structured event + + # Emit to telemetry if available + if hasattr(self, 'telemetry_emitter'): + self.telemetry_emitter.emit(event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(event) async def process_audio( self, pcm_data: PcmData, user_metadata: Optional[Dict[str, Any]] = None diff --git a/getstream/plugins/common/telemetry.py b/getstream/plugins/common/telemetry.py new file mode 100644 index 00000000..21d95877 --- /dev/null +++ b/getstream/plugins/common/telemetry.py @@ -0,0 +1,548 @@ +""" +OpenTelemetry integration for GetStream AI plugins. + +This module provides comprehensive observability for all plugin operations +including tracing, metrics, and logging with OpenTelemetry. +""" + +import os +import time +import logging +from contextlib import contextmanager +from typing import Dict, Any, Optional, List, Callable +from dataclasses import dataclass + +# OpenTelemetry imports +from opentelemetry import trace, metrics, context +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + BatchSpanProcessor, + SimpleSpanProcessor +) +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCOTLPSpanExporter +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCOTLPMetricExporter + +from .events import BaseEvent, EventType + + +@dataclass +class TelemetryConfig: + """Configuration for OpenTelemetry integration.""" + + # Service information + service_name: str = "getstream-plugins" + service_version: str = "0.2.0" + service_namespace: str = "getstream" + + # Tracing configuration + enable_tracing: bool = True + trace_sampler: str = "always_on" # "always_on", "always_off", "traceidratio" + trace_sampling_ratio: float = 1.0 + + # Metrics configuration + enable_metrics: bool = True + metrics_export_interval_ms: int = 5000 + + # Logging configuration + enable_logging_instrumentation: bool = True + log_level: str = "INFO" + + # Exporters configuration + otlp_endpoint: Optional[str] = None + otlp_protocol: str = "http" # "http" or "grpc" + otlp_headers: Optional[Dict[str, str]] = None + enable_console_export: bool = True + + # Plugin-specific configuration + enable_plugin_metrics: bool = True + enable_event_tracing: bool = True + enable_performance_metrics: bool = True + + @classmethod + def from_env(cls) -> "TelemetryConfig": + """Create configuration from environment variables.""" + return cls( + service_name=os.getenv("OTEL_SERVICE_NAME", "getstream-plugins"), + service_version=os.getenv("OTEL_SERVICE_VERSION", "0.2.0"), + service_namespace=os.getenv("OTEL_SERVICE_NAMESPACE", "getstream"), + enable_tracing=os.getenv("OTEL_TRACES_ENABLED", "true").lower() == "true", + trace_sampler=os.getenv("OTEL_TRACES_SAMPLER", "always_on"), + trace_sampling_ratio=float(os.getenv("OTEL_TRACES_SAMPLER_ARG", "1.0")), + enable_metrics=os.getenv("OTEL_METRICS_ENABLED", "true").lower() == "true", + metrics_export_interval_ms=int(os.getenv("OTEL_METRICS_EXPORT_INTERVAL_MS", "5000")), + enable_logging_instrumentation=os.getenv("OTEL_LOGS_ENABLED", "true").lower() == "true", + log_level=os.getenv("OTEL_LOG_LEVEL", "INFO"), + otlp_endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"), + otlp_protocol=os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http"), + otlp_headers=cls._parse_otlp_headers(), + enable_console_export=os.getenv("OTEL_CONSOLE_EXPORT", "true").lower() == "true", + enable_plugin_metrics=os.getenv("OTEL_PLUGIN_METRICS", "true").lower() == "true", + enable_event_tracing=os.getenv("OTEL_EVENT_TRACING", "true").lower() == "true", + enable_performance_metrics=os.getenv("OTEL_PERFORMANCE_METRICS", "true").lower() == "true", + ) + + @staticmethod + def _parse_otlp_headers() -> Optional[Dict[str, str]]: + """Parse OTLP headers from environment variable.""" + headers_str = os.getenv("OTEL_EXPORTER_OTLP_HEADERS") + if not headers_str: + return None + + headers = {} + for header in headers_str.split(","): + if "=" in header: + key, value = header.split("=", 1) + headers[key.strip()] = value.strip() + return headers + + +class PluginTelemetry: + """OpenTelemetry integration for GetStream plugins.""" + + def __init__(self, config: Optional[TelemetryConfig] = None): + """Initialize OpenTelemetry integration.""" + self.config = config or TelemetryConfig.from_env() + self._initialized = False + self._tracer_provider: Optional[TracerProvider] = None + self._meter_provider: Optional[MeterProvider] = None + + # Initialize OpenTelemetry if enabled + if self.config.enable_tracing or self.config.enable_metrics: + self._initialize_opentelemetry() + + def _initialize_opentelemetry(self): + """Initialize OpenTelemetry SDK.""" + try: + # Create resource with service information + resource = Resource.create({ + "service.name": self.config.service_name, + "service.version": self.config.service_version, + "service.namespace": self.config.service_namespace, + }) + + # Initialize tracing if enabled + if self.config.enable_tracing: + self._initialize_tracing(resource) + + # Initialize metrics if enabled + if self.config.enable_metrics: + self._initialize_metrics(resource) + + # Initialize logging instrumentation if enabled + if self.config.enable_logging_instrumentation: + self._initialize_logging() + + self._initialized = True + logging.info("OpenTelemetry initialized successfully") + + except Exception as e: + logging.warning(f"Failed to initialize OpenTelemetry: {e}") + self._initialized = False + + def _initialize_tracing(self, resource: Resource): + """Initialize tracing with OpenTelemetry.""" + self._tracer_provider = TracerProvider(resource=resource) + + # Add span processors + if self.config.enable_console_export: + self._tracer_provider.add_span_processor( + SimpleSpanProcessor(ConsoleSpanExporter()) + ) + + if self.config.otlp_endpoint: + if self.config.otlp_protocol == "grpc": + otlp_exporter = GRPCOTLPSpanExporter( + endpoint=self.config.otlp_endpoint, + headers=self.config.otlp_headers + ) + else: + otlp_exporter = OTLPSpanExporter( + endpoint=self.config.otlp_endpoint, + headers=self.config.otlp_headers + ) + + self._tracer_provider.add_span_processor( + BatchSpanProcessor(otlp_exporter) + ) + + # Set as global tracer provider + trace.set_tracer_provider(self._tracer_provider) + + def _initialize_metrics(self, resource: Resource): + """Initialize metrics with OpenTelemetry.""" + self._meter_provider = MeterProvider(resource=resource) + + # Add metric readers + if self.config.enable_console_export: + self._meter_provider.add_metric_reader( + PeriodicExportingMetricReader( + ConsoleMetricExporter(), + export_interval_millis=self.config.metrics_export_interval_ms + ) + ) + + if self.config.otlp_endpoint: + if self.config.otlp_protocol == "grpc": + otlp_exporter = GRPCOTLPMetricExporter( + endpoint=self.config.otlp_endpoint, + headers=self.config.otlp_headers + ) + else: + otlp_exporter = OTLPMetricExporter( + endpoint=self.config.otlp_endpoint, + headers=self.config.otlp_headers + ) + + self._meter_provider.add_metric_reader( + PeriodicExportingMetricReader( + otlp_exporter, + export_interval_millis=self.config.metrics_export_interval_ms + ) + ) + + # Set as global meter provider + metrics.set_meter_provider(self._meter_provider) + + def _initialize_logging(self): + """Initialize logging instrumentation.""" + try: + LoggingInstrumentor().instrument( + set_logging_format=True, + log_level=getattr(logging, self.config.log_level.upper(), logging.INFO) + ) + except Exception as e: + logging.warning(f"Failed to initialize logging instrumentation: {e}") + + def get_tracer(self, name: str) -> trace.Tracer: + """Get a tracer instance.""" + if not self._initialized or not self.config.enable_tracing: + return trace.NoOpTracer() + return trace.get_tracer(name) + + def get_meter(self, name: str) -> metrics.Meter: + """Get a meter instance.""" + if not self._initialized or not self.config.enable_metrics: + return metrics.NoOpMeter() + return metrics.get_meter(name) + + @contextmanager + def trace_span(self, name: str, attributes: Optional[Dict[str, Any]] = None): + """Context manager for tracing spans.""" + if not self._initialized or not self.config.enable_tracing: + yield + return + + tracer = self.get_tracer("getstream.plugins") + with tracer.start_as_current_span(name, attributes=attributes or {}) as span: + try: + yield span + except Exception as e: + span.record_exception(e) + span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) + raise + + def record_event(self, event: BaseEvent, span: Optional[trace.Span] = None): + """Record an event in OpenTelemetry.""" + if not self._initialized: + return + + # Add event as span event if tracing is enabled + if self.config.enable_event_tracing and span: + span.add_event( + f"plugin.{event.event_type.value}", + attributes={ + "event.id": event.event_id, + "event.type": event.event_type.value, + "plugin.name": event.plugin_name or "unknown", + "session.id": event.session_id or "unknown", + "timestamp": event.timestamp.isoformat(), + } + ) + + # Record metrics if enabled + if self.config.enable_plugin_metrics: + self._record_event_metrics(event) + + def _record_event_metrics(self, event: BaseEvent): + """Record metrics for an event.""" + try: + meter = self.get_meter("getstream.plugins.events") + + # Event counter + event_counter = meter.create_counter( + name="plugin.events.total", + description="Total number of plugin events", + unit="1" + ) + + event_counter.add( + 1, + attributes={ + "event.type": event.event_type.value, + "plugin.name": event.plugin_name or "unknown", + "session.id": event.session_id or "unknown", + } + ) + + # Performance metrics for applicable events + if self.config.enable_performance_metrics: + self._record_performance_metrics(event, meter) + + except Exception as e: + logging.debug(f"Failed to record event metrics: {e}") + + def _record_performance_metrics(self, event: BaseEvent, meter: metrics.Meter): + """Record performance-related metrics for an event.""" + try: + # Processing time metrics + if hasattr(event, 'processing_time_ms') and event.processing_time_ms is not None: + processing_time_histogram = meter.create_histogram( + name="plugin.processing.time", + description="Event processing time", + unit="ms" + ) + + processing_time_histogram.record( + event.processing_time_ms, + attributes={ + "event.type": event.event_type.value, + "plugin.name": event.plugin_name or "unknown", + } + ) + + # Confidence metrics for applicable events + if hasattr(event, 'confidence') and event.confidence is not None: + confidence_histogram = meter.create_histogram( + name="plugin.confidence", + description="Event confidence scores", + unit="1" + ) + + confidence_histogram.record( + event.confidence, + attributes={ + "event.type": event.event_type.value, + "plugin.name": event.plugin_name or "unknown", + } + ) + + # Audio duration metrics for applicable events + if hasattr(event, 'audio_duration_ms') and event.audio_duration_ms is not None: + audio_duration_histogram = meter.create_histogram( + name="plugin.audio.duration", + description="Audio duration for events", + unit="ms" + ) + + audio_duration_histogram.record( + event.audio_duration_ms, + attributes={ + "event.type": event.event_type.value, + "plugin.name": event.plugin_name or "unknown", + } + ) + + except Exception as e: + logging.debug(f"Failed to record performance metrics: {e}") + + def record_error(self, error: Exception, context_info: Optional[Dict[str, Any]] = None): + """Record an error in OpenTelemetry.""" + if not self._initialized: + return + + try: + # Record error in current span if available + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + current_span.record_exception(error) + current_span.set_status(trace.Status(trace.StatusCode.ERROR, str(error))) + + if context_info: + for key, value in context_info.items(): + current_span.set_attribute(key, str(value)) + + # Record error metrics + if self.config.enable_plugin_metrics: + meter = self.get_meter("getstream.plugins.errors") + error_counter = meter.create_counter( + name="plugin.errors.total", + description="Total number of plugin errors", + unit="1" + ) + + # Prepare attributes with error info + error_attributes = { + "error.type": type(error).__name__, + "error.message": str(error), + } + if context_info: + error_attributes.update(context_info) + + error_counter.add(1, attributes=error_attributes) + + except Exception as e: + logging.debug(f"Failed to record error: {e}") + + def record_plugin_operation( + self, + operation_name: str, + plugin_name: str, + duration_ms: float, + success: bool = True, + attributes: Optional[Dict[str, Any]] = None + ): + """Record metrics for plugin operations.""" + if not self._initialized or not self.config.enable_plugin_metrics: + return + + try: + meter = self.get_meter("getstream.plugins.operations") + + # Operation duration histogram + duration_histogram = meter.create_histogram( + name="plugin.operation.duration", + description="Plugin operation duration", + unit="ms" + ) + + # Prepare histogram attributes + histogram_attributes = { + "operation.name": operation_name, + "plugin.name": plugin_name, + "success": success, + } + if attributes: + histogram_attributes.update(attributes) + + duration_histogram.record(duration_ms, attributes=histogram_attributes) + + # Operation counter + operation_counter = meter.create_counter( + name="plugin.operations.total", + description="Total number of plugin operations", + unit="1" + ) + + # Prepare counter attributes + counter_attributes = { + "operation.name": operation_name, + "plugin.name": plugin_name, + "success": success, + } + if attributes: + counter_attributes.update(attributes) + + operation_counter.add(1, attributes=counter_attributes) + + except Exception as e: + logging.debug(f"Failed to record operation metrics: {e}") + + def shutdown(self): + """Shutdown OpenTelemetry gracefully.""" + if not self._initialized: + return + + try: + if self._tracer_provider: + self._tracer_provider.shutdown() + + if self._meter_provider: + self._meter_provider.shutdown() + + self._initialized = False + logging.info("OpenTelemetry shutdown successfully") + + except Exception as e: + logging.warning(f"Error during OpenTelemetry shutdown: {e}") + + +# Global telemetry instance +_global_telemetry: Optional[PluginTelemetry] = None + + +def initialize_telemetry(config: Optional[TelemetryConfig] = None) -> PluginTelemetry: + """Initialize global telemetry instance.""" + global _global_telemetry + if _global_telemetry is None: + _global_telemetry = PluginTelemetry(config) + return _global_telemetry + + +def get_telemetry() -> PluginTelemetry: + """Get the global telemetry instance.""" + global _global_telemetry + if _global_telemetry is None: + _global_telemetry = PluginTelemetry() + return _global_telemetry + + +def shutdown_telemetry(): + """Shutdown global telemetry.""" + global _global_telemetry + if _global_telemetry: + _global_telemetry.shutdown() + _global_telemetry = None + + +# Convenience functions for common operations +def trace_plugin_operation(operation_name: str, plugin_name: str): + """Decorator for tracing plugin operations.""" + def decorator(func): + def wrapper(*args, **kwargs): + telemetry = get_telemetry() + start_time = time.time() + + try: + with telemetry.trace_span( + f"{plugin_name}.{operation_name}", + attributes={ + "plugin.name": plugin_name, + "operation.name": operation_name, + } + ) as span: + result = func(*args, **kwargs) + + # Record success metrics + duration_ms = (time.time() - start_time) * 1000 + telemetry.record_plugin_operation( + operation_name, plugin_name, duration_ms, success=True + ) + + return result + + except Exception as e: + # Record error metrics + duration_ms = (time.time() - start_time) * 1000 + telemetry.record_plugin_operation( + operation_name, plugin_name, duration_ms, success=False + ) + + telemetry.record_error(e, { + "operation.name": operation_name, + "plugin.name": plugin_name, + }) + + raise + + return wrapper + return decorator + + +__all__ = [ + "TelemetryConfig", + "PluginTelemetry", + "initialize_telemetry", + "get_telemetry", + "shutdown_telemetry", + "trace_plugin_operation", +] diff --git a/getstream/plugins/common/telemetry_events.py b/getstream/plugins/common/telemetry_events.py new file mode 100644 index 00000000..292a4c31 --- /dev/null +++ b/getstream/plugins/common/telemetry_events.py @@ -0,0 +1,429 @@ +""" +Telemetry-enhanced event emitter for GetStream AI plugins. + +This module provides an event emitter that automatically integrates with OpenTelemetry +for tracing, metrics, and logging of all plugin events. +""" + +import logging +import time +from typing import Dict, Any, Optional, List, Callable, Union +from contextlib import contextmanager + +from .events import BaseEvent, EventType +from .telemetry import get_telemetry, PluginTelemetry +from opentelemetry import context + + +class TelemetryEventEmitter: + """ + Event emitter with automatic OpenTelemetry integration. + + This class extends the basic event emission functionality with automatic + tracing, metrics, and logging through OpenTelemetry. + """ + + def __init__(self, plugin_name: str, telemetry: Optional[PluginTelemetry] = None): + """ + Initialize the telemetry event emitter. + + Args: + plugin_name: Name of the plugin using this emitter + telemetry: Optional telemetry instance, will use global if not provided + """ + self.plugin_name = plugin_name + self.telemetry = telemetry or get_telemetry() + self.logger = logging.getLogger(f"getstream.plugins.{plugin_name}") + + # Event listeners + self._listeners: Dict[EventType, List[Callable[[BaseEvent], None]]] = {} + + # Performance tracking + self._operation_start_times: Dict[str, float] = {} + + def emit(self, event: BaseEvent, trace_context: Optional[Dict[str, Any]] = None): + """ + Emit an event with automatic telemetry integration. + + Args: + event: The event to emit + trace_context: Optional context for tracing + """ + # Ensure event has plugin information + if not event.plugin_name: + event.plugin_name = self.plugin_name + + # Record event in OpenTelemetry + self.telemetry.record_event(event) + + # Log event + self._log_event(event) + + # Notify listeners + self._notify_listeners(event) + + # Add trace context if provided + if trace_context: + self._add_trace_context(event, trace_context) + + def emit_with_trace( + self, + event: BaseEvent, + operation_name: str, + attributes: Optional[Dict[str, Any]] = None + ): + """ + Emit an event within a trace span. + + Args: + event: The event to emit + operation_name: Name of the operation being traced + attributes: Additional attributes for the trace span + """ + span_attributes = { + "plugin.name": self.plugin_name, + "operation.name": operation_name, + } + if attributes: + span_attributes.update(attributes) + + with self.telemetry.trace_span( + f"{self.plugin_name}.{operation_name}", + attributes=span_attributes + ) as span: + # Add event to span + self.telemetry.record_event(event, span) + + # Emit event normally + self.emit(event) + + # Add span attributes from event + if hasattr(event, 'text') and event.text: + span.set_attribute("event.text_length", len(event.text)) + + if hasattr(event, 'confidence') and event.confidence is not None: + span.set_attribute("event.confidence", event.confidence) + + if hasattr(event, 'processing_time_ms') and event.processing_time_ms is not None: + span.set_attribute("event.processing_time_ms", event.processing_time_ms) + + def start_operation(self, operation_name: str, attributes: Optional[Dict[str, Any]] = None): + """ + Start timing an operation. + + Args: + operation_name: Name of the operation + attributes: Additional attributes for the operation + """ + operation_id = f"{self.plugin_name}.{operation_name}" + self._operation_start_times[operation_id] = time.time() + + # Start trace span if tracing is enabled + if self.telemetry.config.enable_tracing: + span_attrs = { + "plugin.name": self.plugin_name, + "operation.name": operation_name, + } + if attributes: + span_attrs.update(attributes) + + # Store span in context for later use + context.attach(context.set_value("operation_span_attrs", span_attrs)) + + def end_operation( + self, + operation_name: str, + success: bool = True, + attributes: Optional[Dict[str, Any]] = None, + event: Optional[BaseEvent] = None + ): + """ + End timing an operation and record metrics. + + Args: + operation_name: Name of the operation + success: Whether the operation was successful + attributes: Additional attributes for the operation + event: Optional event to emit with the operation completion + """ + operation_id = f"{self.plugin_name}.{operation_name}" + start_time = self._operation_start_times.pop(operation_id, None) + + if start_time is None: + self.logger.warning(f"Operation {operation_id} was not started") + return + + duration_ms = (time.time() - start_time) * 1000 + + # Record operation metrics + self.telemetry.record_plugin_operation( + operation_name, + self.plugin_name, + duration_ms, + success, + attributes + ) + + # Emit event if provided + if event: + self.emit(event) + + # Log operation completion + self.logger.debug( + f"Operation {operation_name} completed in {duration_ms:.2f}ms (success: {success})" + ) + + @contextmanager + def operation_context( + self, + operation_name: str, + attributes: Optional[Dict[str, Any]] = None, + emit_event: Optional[BaseEvent] = None + ): + """ + Context manager for automatic operation timing and tracing. + + Args: + operation_name: Name of the operation + attributes: Additional attributes for the operation + emit_event: Optional event to emit when operation completes + """ + try: + self.start_operation(operation_name, attributes) + yield + self.end_operation(operation_name, success=True, attributes=attributes, event=emit_event) + except Exception as e: + self.end_operation(operation_name, success=False, attributes=attributes) + self.telemetry.record_error(e, { + "operation.name": operation_name, + "plugin.name": self.plugin_name, + }) + raise + + def add_listener(self, event_type: EventType, listener: Callable[[BaseEvent], None]): + """Add a listener for a specific event type.""" + if event_type not in self._listeners: + self._listeners[event_type] = [] + self._listeners[event_type].append(listener) + + def remove_listener(self, event_type: EventType, listener: Callable[[BaseEvent], None]): + """Remove a listener for a specific event type.""" + if event_type in self._listeners and listener in self._listeners[event_type]: + self._listeners[event_type].remove(listener) + + def _notify_listeners(self, event: BaseEvent): + """Notify all listeners for the event type.""" + listeners = self._listeners.get(event.event_type, []) + for listener in listeners: + try: + listener(event) + except Exception as e: + self.logger.error(f"Error in event listener: {e}") + self.telemetry.record_error(e, { + "event.type": event.event_type.value, + "plugin.name": self.plugin_name, + }) + + def _log_event(self, event: BaseEvent): + """Log an event with structured information.""" + log_data = { + "event_type": event.event_type.value, + "event_id": event.event_id, + "session_id": event.session_id, + "plugin_name": event.plugin_name, + "timestamp": event.timestamp.isoformat(), + } + + # Add event-specific information + if hasattr(event, 'text') and event.text: + log_data["text_length"] = len(event.text) + log_data["text_preview"] = event.text[:100] + + if hasattr(event, 'confidence') and event.confidence is not None: + log_data["confidence"] = event.confidence + + if hasattr(event, 'processing_time_ms') and event.processing_time_ms is not None: + log_data["processing_time_ms"] = event.processing_time_ms + + if hasattr(event, 'error') and event.error: + log_data["error_message"] = str(event.error) + log_data["error_type"] = type(event.error).__name__ + + # Log with appropriate level + if "ERROR" in event.event_type.value.upper(): + self.logger.error(f"Event: {event.event_type.value}", extra=log_data) + else: + self.logger.info(f"Event: {event.event_type.value}", extra=log_data) + + def _add_trace_context(self, event: BaseEvent, trace_context: Dict[str, Any]): + """Add trace context to an event.""" + # This could be extended to add trace IDs, span IDs, etc. + if hasattr(event, 'user_metadata'): + if event.user_metadata is None: + event.user_metadata = {} + event.user_metadata.update(trace_context) + + def emit_error( + self, + error: Exception, + event_type: EventType, + context_info: Optional[Dict[str, Any]] = None, + session_id: Optional[str] = None + ): + """ + Emit an error event with automatic error recording. + + Args: + error: The exception that occurred + event_type: Type of error event to emit + context_info: Additional context information + session_id: Optional session ID for the error + """ + from .events import create_event + + # Create error event + error_event = create_event( + event_type, + error=error, + plugin_name=self.plugin_name, + session_id=session_id, + user_metadata=context_info + ) + + # Record error in OpenTelemetry + error_context = { + "event.type": event_type.value, + "plugin.name": self.plugin_name, + "session.id": session_id, + } + if context_info: + error_context.update(context_info) + + self.telemetry.record_error(error, error_context) + + # Emit the error event + self.emit(error_event) + + def emit_performance_event( + self, + event_type: EventType, + duration_ms: float, + success: bool = True, + additional_attrs: Optional[Dict[str, Any]] = None, + session_id: Optional[str] = None + ): + """ + Emit a performance-related event. + + Args: + event_type: Type of event to emit + duration_ms: Duration of the operation in milliseconds + success: Whether the operation was successful + additional_attrs: Additional attributes for the event + session_id: Optional session ID + """ + from .events import create_event + + # Create performance event + user_metadata = { + "duration_ms": duration_ms, + "success": success, + } + if additional_attrs: + user_metadata.update(additional_attrs) + + event = create_event( + event_type, + plugin_name=self.plugin_name, + session_id=session_id, + user_metadata=user_metadata + ) + + # Record performance metrics + if hasattr(event, 'processing_time_ms'): + event.processing_time_ms = duration_ms + + # Emit the event + self.emit(event) + + def get_telemetry_summary(self) -> Dict[str, Any]: + """Get a summary of telemetry data for this emitter.""" + return { + "plugin_name": self.plugin_name, + "active_operations": len(self._operation_start_times), + "listener_counts": { + event_type.value: len(listeners) + for event_type, listeners in self._listeners.items() + } + } + + +class TelemetryEventFilter: + """ + Event filter with telemetry integration. + + This class provides filtering capabilities while maintaining + telemetry context for filtered events. + """ + + def __init__(self, telemetry: Optional[PluginTelemetry] = None): + """Initialize the telemetry event filter.""" + self.telemetry = telemetry or get_telemetry() + + def filter_events( + self, + events: List[BaseEvent], + filter_criteria: Dict[str, Any] + ) -> List[BaseEvent]: + """ + Filter events based on criteria while maintaining telemetry. + + Args: + events: List of events to filter + filter_criteria: Criteria for filtering + + Returns: + Filtered list of events + """ + filtered_events = [] + + for event in events: + if self._matches_criteria(event, filter_criteria): + filtered_events.append(event) + else: + # Record filtered event for telemetry + self.telemetry.record_event(event) + + # Record filtering metrics + if self.telemetry.config.enable_plugin_metrics: + meter = self.telemetry.get_meter("getstream.plugins.filtering") + filter_counter = meter.create_counter( + name="plugin.events.filtered", + description="Number of events filtered", + unit="1" + ) + + filter_counter.add( + len(events) - len(filtered_events), + attributes={"filter.criteria": str(filter_criteria)} + ) + + return filtered_events + + def _matches_criteria(self, event: BaseEvent, criteria: Dict[str, Any]) -> bool: + """Check if an event matches the filter criteria.""" + for key, value in criteria.items(): + if not hasattr(event, key): + return False + + event_value = getattr(event, key) + if event_value != value: + return False + + return True + + +__all__ = [ + "TelemetryEventEmitter", + "TelemetryEventFilter", +] diff --git a/getstream/plugins/common/telemetry_registry.py b/getstream/plugins/common/telemetry_registry.py new file mode 100644 index 00000000..7186a5ec --- /dev/null +++ b/getstream/plugins/common/telemetry_registry.py @@ -0,0 +1,606 @@ +""" +Telemetry-enhanced event registry for GetStream AI plugins. + +This module provides an event registry that automatically integrates with OpenTelemetry +for comprehensive observability of all plugin events and operations. +""" + +import logging +import time +from typing import Dict, Any, List, Optional, Callable, Set +from collections import defaultdict, deque +from dataclasses import dataclass + +from .events import BaseEvent, EventType +from .telemetry import get_telemetry, PluginTelemetry + + +@dataclass +class RegistryMetrics: + """Metrics for the event registry.""" + total_events: int = 0 + events_by_type: Dict[str, int] = None + events_by_plugin: Dict[str, int] = None + events_by_session: Dict[str, int] = None + error_count: int = 0 + active_listeners: int = 0 + + def __post_init__(self): + if self.events_by_type is None: + self.events_by_type = defaultdict(int) + if self.events_by_plugin is None: + self.events_by_plugin = defaultdict(int) + if self.events_by_session is None: + self.events_by_session = defaultdict(int) + + +class TelemetryEventRegistry: + """ + Enhanced event registry with OpenTelemetry integration. + + This class extends the basic event registry functionality with automatic + tracing, metrics, and performance monitoring through OpenTelemetry. + """ + + def __init__( + self, + max_events: int = 10000, + telemetry: Optional[PluginTelemetry] = None, + enable_metrics: bool = True, + enable_tracing: bool = True + ): + """ + Initialize the telemetry event registry. + + Args: + max_events: Maximum number of events to keep in memory + telemetry: Optional telemetry instance, will use global if not provided + enable_metrics: Whether to enable metrics collection + enable_tracing: Whether to enable tracing + """ + self.max_events = max_events + self.telemetry = telemetry or get_telemetry() + self.enable_metrics = enable_metrics + self.enable_tracing = enable_tracing + + # Event storage + self.events: deque = deque(maxlen=max_events) + self.event_counts: Dict[EventType, int] = defaultdict(int) + self.session_events: Dict[str, List[BaseEvent]] = defaultdict(list) + self.plugin_events: Dict[str, List[BaseEvent]] = defaultdict(list) + + # Error tracking + self.error_counts: Dict[str, int] = defaultdict(int) + self.error_details: List[Dict[str, Any]] = [] + + # Event listeners + self.listeners: Dict[EventType, List[Callable]] = defaultdict(list) + self.global_listeners: List[Callable] = [] + + # Performance tracking + self.operation_timings: Dict[str, List[float]] = defaultdict(list) + self.registry_metrics = RegistryMetrics() + + # Initialize telemetry if enabled + if self.enable_metrics or self.enable_tracing: + self._initialize_telemetry() + + def _initialize_telemetry(self): + """Initialize telemetry components for the registry.""" + try: + # Create registry-specific meters + if self.enable_metrics: + self._registry_meter = self.telemetry.get_meter("getstream.plugins.registry") + self._create_registry_metrics() + + # Create registry-specific tracer + if self.enable_tracing: + self._registry_tracer = self.telemetry.get_tracer("getstream.plugins.registry") + + logging.info("Telemetry initialized for event registry") + + except Exception as e: + logging.warning(f"Failed to initialize telemetry for registry: {e}") + self.enable_metrics = False + self.enable_tracing = False + + def _create_registry_metrics(self): + """Create metrics for the registry.""" + try: + # Event counter + self._event_counter = self._registry_meter.create_counter( + name="registry.events.total", + description="Total number of events registered", + unit="1" + ) + + # Event type counter + self._event_type_counter = self._registry_meter.create_counter( + name="registry.events.by_type", + description="Events by type", + unit="1" + ) + + # Plugin counter + self._plugin_counter = self._registry_meter.create_counter( + name="registry.events.by_plugin", + description="Events by plugin", + unit="1" + ) + + # Session counter + self._session_counter = self._registry_meter.create_counter( + name="registry.events.by_session", + description="Events by session", + unit="1" + ) + + # Error counter + self._error_counter = self._registry_meter.create_counter( + name="registry.errors.total", + description="Total number of registry errors", + unit="1" + ) + + # Registry size gauge + self._registry_size_gauge = self._registry_meter.create_up_down_counter( + name="registry.size", + description="Current size of the registry", + unit="1" + ) + + # Listener count gauge + self._listener_count_gauge = self._registry_meter.create_up_down_counter( + name="registry.listeners", + description="Number of active listeners", + unit="1" + ) + + except Exception as e: + logging.warning(f"Failed to create registry metrics: {e}") + + def register_event(self, event: BaseEvent): + """Register a new event with telemetry integration.""" + start_time = time.time() + + try: + # Add event to storage + self.events.append(event) + self.event_counts[event.event_type] += 1 + + # Track by session + if event.session_id: + self.session_events[event.session_id].append(event) + + # Track by plugin + if event.plugin_name: + self.plugin_events[event.plugin_name].append(event) + + # Track errors separately + if "ERROR" in event.event_type.value.upper(): + error_key = f"{event.plugin_name}_{event.event_type.value}" + self.error_counts[error_key] += 1 + + # Store error details + error_detail = { + "timestamp": event.timestamp.isoformat(), + "event_type": event.event_type.value, + "plugin_name": event.plugin_name, + "session_id": event.session_id, + "error_message": getattr(event, 'error_message', str(getattr(event, 'error', 'Unknown error'))), + } + self.error_details.append(error_detail) + + # Keep only last 100 error details + if len(self.error_details) > 100: + self.error_details = self.error_details[-100:] + + # Update metrics + self._update_registry_metrics(event) + + # Record telemetry + if self.enable_metrics: + self._record_event_metrics(event) + + if self.enable_tracing: + self._record_event_trace(event) + + # Notify listeners + self._notify_listeners(event) + + # Record operation timing + duration_ms = (time.time() - start_time) * 1000 + self.operation_timings["event_registration"].append(duration_ms) + + # Keep only last 1000 timings + if len(self.operation_timings["event_registration"]) > 1000: + self.operation_timings["event_registration"] = self.operation_timings["event_registration"][-1000:] + + except Exception as e: + logging.error(f"Error registering event: {e}") + if self.enable_metrics: + self._record_error_metrics(e, "event_registration") + raise + + def _update_registry_metrics(self, event: BaseEvent): + """Update internal registry metrics.""" + self.registry_metrics.total_events += 1 + self.registry_metrics.events_by_type[event.event_type.value] += 1 + + if event.plugin_name: + self.registry_metrics.events_by_plugin[event.plugin_name] += 1 + + if event.session_id: + self.registry_metrics.events_by_session[event.session_id] += 1 + + if "ERROR" in event.event_type.value.upper(): + self.registry_metrics.error_count += 1 + + def _record_event_metrics(self, event: BaseEvent): + """Record metrics for an event.""" + try: + # Increment event counter + self._event_counter.add(1) + + # Increment event type counter + self._event_type_counter.add( + 1, + attributes={"event.type": event.event_type.value} + ) + + # Increment plugin counter + if event.plugin_name: + self._plugin_counter.add( + 1, + attributes={"plugin.name": event.plugin_name} + ) + + # Increment session counter + if event.session_id: + self._session_counter.add( + 1, + attributes={"session.id": event.session_id} + ) + + # Update registry size + self._registry_size_gauge.add( + 1, + attributes={"metric.type": "total_events"} + ) + + except Exception as e: + logging.debug(f"Failed to record event metrics: {e}") + + def _record_event_trace(self, event: BaseEvent): + """Record trace information for an event.""" + try: + with self._registry_tracer.start_as_current_span( + "registry.event_registered", + attributes={ + "event.type": event.event_type.value, + "event.id": event.event_id, + "plugin.name": event.plugin_name or "unknown", + "session.id": event.session_id or "unknown", + "timestamp": event.timestamp.isoformat(), + } + ) as span: + # Add event-specific attributes + if hasattr(event, 'text') and event.text: + span.set_attribute("event.text_length", len(event.text)) + + if hasattr(event, 'confidence') and event.confidence is not None: + span.set_attribute("event.confidence", event.confidence) + + if hasattr(event, 'processing_time_ms') and event.processing_time_ms is not None: + span.set_attribute("event.processing_time_ms", event.processing_time_ms) + + # Add event as span event + span.add_event( + "event.registered", + attributes={ + "event.type": event.event_type.value, + "plugin.name": event.plugin_name or "unknown", + } + ) + + except Exception as e: + logging.debug(f"Failed to record event trace: {e}") + + def _record_error_metrics(self, error: Exception, context: str): + """Record error metrics.""" + try: + self._error_counter.add( + 1, + attributes={ + "error.type": type(error).__name__, + "error.message": str(error), + "context": context, + } + ) + except Exception as e: + logging.debug(f"Failed to record error metrics: {e}") + + def add_listener(self, event_type: EventType, listener: Callable[[BaseEvent], None]): + """Add a listener for a specific event type.""" + self.listeners[event_type].append(listener) + self._update_listener_metrics() + + def add_global_listener(self, listener: Callable[[BaseEvent], None]): + """Add a global listener for all events.""" + self.global_listeners.append(listener) + self._update_listener_metrics() + + def remove_listener(self, event_type: EventType, listener: Callable): + """Remove a listener for a specific event type.""" + if listener in self.listeners[event_type]: + self.listeners[event_type].remove(listener) + self._update_listener_metrics() + + def remove_global_listener(self, listener: Callable): + """Remove a global listener.""" + if listener in self.global_listeners: + self.global_listeners.remove(listener) + self._update_listener_metrics() + + def _update_listener_metrics(self): + """Update listener count metrics.""" + if not self.enable_metrics: + return + + try: + total_listeners = sum(len(listeners) for listeners in self.listeners.values()) + total_listeners += len(self.global_listeners) + + # Reset to 0 first, then add current count + self._listener_count_gauge.add(-self.registry_metrics.active_listeners) + self._listener_count_gauge.add(total_listeners) + + self.registry_metrics.active_listeners = total_listeners + + except Exception as e: + logging.debug(f"Failed to update listener metrics: {e}") + + def _notify_listeners(self, event: BaseEvent): + """Notify all relevant listeners.""" + # Notify type-specific listeners + type_listeners = self.listeners.get(event.event_type, []) + for listener in type_listeners: + try: + listener(event) + except Exception as e: + logging.error(f"Error in event listener: {e}") + if self.enable_metrics: + self._record_error_metrics(e, "listener_execution") + + # Notify global listeners + for listener in self.global_listeners: + try: + listener(event) + except Exception as e: + logging.error(f"Error in global listener: {e}") + if self.enable_metrics: + self._record_error_metrics(e, "global_listener_execution") + + def get_events( + self, + filter_criteria: Optional[Dict[str, Any]] = None, + limit: Optional[int] = None + ) -> List[BaseEvent]: + """Get events matching the filter criteria.""" + start_time = time.time() + + try: + events = list(self.events) + + if filter_criteria: + events = self._apply_filter(events, filter_criteria) + + if limit: + events = events[-limit:] + + # Record query metrics + if self.enable_metrics: + duration_ms = (time.time() - start_time) * 1000 + self._record_query_metrics(len(events), duration_ms, filter_criteria) + + return events + + except Exception as e: + logging.error(f"Error getting events: {e}") + if self.enable_metrics: + self._record_error_metrics(e, "event_query") + raise + + def _apply_filter(self, events: List[BaseEvent], criteria: Dict[str, Any]) -> List[BaseEvent]: + """Apply filter criteria to events.""" + filtered_events = [] + + for event in events: + if self._matches_criteria(event, criteria): + filtered_events.append(event) + + return filtered_events + + def _matches_criteria(self, event: BaseEvent, criteria: Dict[str, Any]) -> bool: + """Check if an event matches the filter criteria.""" + for key, value in criteria.items(): + if not hasattr(event, key): + return False + + event_value = getattr(event, key) + if event_value != value: + return False + + return True + + def _record_query_metrics(self, result_count: int, duration_ms: float, criteria: Optional[Dict[str, Any]]): + """Record metrics for event queries.""" + try: + # Query duration histogram + query_duration_histogram = self._registry_meter.create_histogram( + name="registry.query.duration", + description="Event query duration", + unit="ms" + ) + + query_duration_histogram.record( + duration_ms, + attributes={ + "result.count": str(result_count), + "has_filter": str(criteria is not None), + } + ) + + except Exception as e: + logging.debug(f"Failed to record query metrics: {e}") + + def get_session_events(self, session_id: str) -> List[BaseEvent]: + """Get all events for a specific session.""" + return self.session_events.get(session_id, []) + + def get_plugin_events(self, plugin_name: str) -> List[BaseEvent]: + """Get all events for a specific plugin.""" + return self.plugin_events.get(plugin_name, []) + + def get_error_summary(self) -> Dict[str, Any]: + """Get summary of errors across all plugins.""" + total_events = len(self.events) + total_errors = sum(self.error_counts.values()) + + summary = { + "total_events": total_events, + "total_errors": total_errors, + "error_rate": total_errors / total_events if total_events > 0 else 0, + "error_breakdown": dict(self.error_counts), + "most_common_errors": sorted( + self.error_counts.items(), + key=lambda x: x[1], + reverse=True + )[:10], + "recent_errors": self.error_details[-10:] if self.error_details else [] + } + + return summary + + def get_performance_summary(self) -> Dict[str, Any]: + """Get performance summary for the registry.""" + summary = {} + + for operation, timings in self.operation_timings.items(): + if timings: + summary[operation] = { + "count": len(timings), + "avg_duration_ms": sum(timings) / len(timings), + "min_duration_ms": min(timings), + "max_duration_ms": max(timings), + "p95_duration_ms": sorted(timings)[int(len(timings) * 0.95)] if len(timings) > 0 else 0, + } + + return summary + + def get_statistics(self) -> Dict[str, Any]: + """Get comprehensive statistics about events.""" + total_events = len(self.events) + + # Calculate event type distribution + event_distribution = { + event_type.value: count + for event_type, count in self.event_counts.items() + } + + # Calculate session statistics + session_count = len(self.session_events) + avg_events_per_session = ( + total_events / session_count if session_count > 0 else 0 + ) + + # Calculate plugin statistics + plugin_count = len(self.plugin_events) + avg_events_per_plugin = ( + total_events / plugin_count if plugin_count > 0 else 0 + ) + + # Calculate time-based statistics + if self.events: + oldest_event = min(self.events, key=lambda e: e.timestamp) + newest_event = max(self.events, key=lambda e: e.timestamp) + time_span_ms = (newest_event.timestamp - oldest_event.timestamp).total_seconds() * 1000 + events_per_second = total_events / (time_span_ms / 1000) if time_span_ms > 0 else 0 + else: + time_span_ms = 0 + events_per_second = 0 + + return { + "total_events": total_events, + "event_distribution": event_distribution, + "session_count": session_count, + "avg_events_per_session": avg_events_per_session, + "plugin_count": plugin_count, + "avg_events_per_plugin": avg_events_per_plugin, + "time_span_ms": time_span_ms, + "events_per_second": events_per_second, + "error_summary": self.get_error_summary(), + "performance_summary": self.get_performance_summary(), + "registry_metrics": self.registry_metrics, + } + + def clear(self): + """Clear all events from the registry.""" + self.events.clear() + self.event_counts.clear() + self.session_events.clear() + self.plugin_events.clear() + self.error_counts.clear() + self.error_details.clear() + self.operation_timings.clear() + self.registry_metrics = RegistryMetrics() + + # Update metrics + if self.enable_metrics: + try: + self._registry_size_gauge.add(-self.registry_metrics.total_events) + except Exception as e: + logging.debug(f"Failed to update size metrics: {e}") + + def shutdown(self): + """Shutdown the registry gracefully.""" + try: + # Clear all data + self.clear() + + # Clear listeners + self.listeners.clear() + self.global_listeners.clear() + + logging.info("Telemetry event registry shutdown successfully") + + except Exception as e: + logging.warning(f"Error during registry shutdown: {e}") + + +# Global telemetry registry instance +_global_telemetry_registry: Optional[TelemetryEventRegistry] = None + + +def get_global_telemetry_registry() -> TelemetryEventRegistry: + """Get the global telemetry event registry.""" + global _global_telemetry_registry + if _global_telemetry_registry is None: + _global_telemetry_registry = TelemetryEventRegistry() + return _global_telemetry_registry + + +def shutdown_global_telemetry_registry(): + """Shutdown the global telemetry event registry.""" + global _global_telemetry_registry + if _global_telemetry_registry: + _global_telemetry_registry.shutdown() + _global_telemetry_registry = None + + +__all__ = [ + "RegistryMetrics", + "TelemetryEventRegistry", + "get_global_telemetry_registry", + "shutdown_global_telemetry_registry", +] diff --git a/getstream/plugins/common/tests/test_telemetry.py b/getstream/plugins/common/tests/test_telemetry.py new file mode 100644 index 00000000..1882b8e0 --- /dev/null +++ b/getstream/plugins/common/tests/test_telemetry.py @@ -0,0 +1,328 @@ +""" +Tests for OpenTelemetry integration. +""" + +import pytest +import time +from unittest.mock import Mock, patch + +from ..telemetry import ( + TelemetryConfig, + PluginTelemetry, + initialize_telemetry, + get_telemetry, + shutdown_telemetry, + trace_plugin_operation, +) +from ..telemetry_events import TelemetryEventEmitter, TelemetryEventFilter +from ..telemetry_registry import TelemetryEventRegistry, get_global_telemetry_registry +from ..events import EventType, create_event, STTTranscriptEvent + + +class TestTelemetryConfig: + """Test TelemetryConfig class.""" + + def test_default_config(self): + """Test default configuration values.""" + config = TelemetryConfig() + + assert config.service_name == "getstream-plugins" + assert config.service_version == "0.2.0" + assert config.enable_tracing is True + assert config.enable_metrics is True + assert config.enable_logging_instrumentation is True + + def test_custom_config(self): + """Test custom configuration values.""" + config = TelemetryConfig( + service_name="test-service", + service_version="1.0.0", + enable_tracing=False, + enable_metrics=False + ) + + assert config.service_name == "test-service" + assert config.service_version == "1.0.0" + assert config.enable_tracing is False + assert config.enable_metrics is False + + @patch.dict('os.environ', { + 'OTEL_SERVICE_NAME': 'env-service', + 'OTEL_SERVICE_VERSION': '2.0.0', + 'OTEL_TRACES_ENABLED': 'false' + }) + def test_from_env(self): + """Test configuration from environment variables.""" + config = TelemetryConfig.from_env() + + assert config.service_name == "env-service" + assert config.service_version == "2.0.0" + assert config.enable_tracing is False + + def test_parse_otlp_headers(self): + """Test OTLP headers parsing.""" + with patch.dict('os.environ', { + 'OTEL_EXPORTER_OTLP_HEADERS': 'key1=value1,key2=value2' + }): + config = TelemetryConfig.from_env() + assert config.otlp_headers == {"key1": "value1", "key2": "value2"} + + +class TestPluginTelemetry: + """Test PluginTelemetry class.""" + + def test_init_with_config(self): + """Test initialization with configuration.""" + config = TelemetryConfig(enable_tracing=False, enable_metrics=False) + telemetry = PluginTelemetry(config) + + assert telemetry.config == config + assert telemetry._initialized is False + + @patch('getstream.plugins.common.telemetry.Resource.create') + @patch('getstream.plugins.common.telemetry.trace.set_tracer_provider') + @patch('getstream.plugins.common.telemetry.metrics.set_meter_provider') + def test_initialize_opentelemetry(self, mock_meter, mock_trace, mock_resource): + """Test OpenTelemetry initialization.""" + config = TelemetryConfig(enable_tracing=True, enable_metrics=True) + telemetry = PluginTelemetry(config) + + # Mock resource creation + mock_resource.return_value = Mock() + + # Test initialization + telemetry._initialize_opentelemetry() + + assert telemetry._initialized is True + mock_resource.assert_called_once() + mock_trace.assert_called_once() + mock_meter.assert_called_once() + + def test_get_tracer_noop_when_disabled(self): + """Test that NoOpTracer is returned when tracing is disabled.""" + config = TelemetryConfig(enable_tracing=False) + telemetry = PluginTelemetry(config) + + tracer = telemetry.get_tracer("test") + assert "NoOpTracer" in str(type(tracer)) + + def test_get_meter_noop_when_disabled(self): + """Test that NoOpMeter is returned when metrics is disabled.""" + config = TelemetryConfig(enable_metrics=False) + telemetry = PluginTelemetry(config) + + meter = telemetry.get_meter("test") + assert "NoOpMeter" in str(type(meter)) + + +class TestTelemetryEventEmitter: + """Test TelemetryEventEmitter class.""" + + def test_init(self): + """Test emitter initialization.""" + emitter = TelemetryEventEmitter("test_plugin") + + assert emitter.plugin_name == "test_plugin" + assert emitter.telemetry is not None + assert emitter._listeners == {} + + def test_emit_basic(self): + """Test basic event emission.""" + emitter = TelemetryEventEmitter("test_plugin") + event = create_event(EventType.STT_TRANSCRIPT, text="test") + + # Mock telemetry methods + emitter.telemetry.record_event = Mock() + + emitter.emit(event) + + assert event.plugin_name == "test_plugin" + emitter.telemetry.record_event.assert_called_once_with(event) + + def test_add_listener(self): + """Test adding event listeners.""" + emitter = TelemetryEventEmitter("test_plugin") + listener = Mock() + + emitter.add_listener(EventType.STT_TRANSCRIPT, listener) + + assert listener in emitter._listeners[EventType.STT_TRANSCRIPT] + + def test_remove_listener(self): + """Test removing event listeners.""" + emitter = TelemetryEventEmitter("test_plugin") + listener = Mock() + + emitter.add_listener(EventType.STT_TRANSCRIPT, listener) + emitter.remove_listener(EventType.STT_TRANSCRIPT, listener) + + assert listener not in emitter._listeners[EventType.STT_TRANSCRIPT] + + +class TestTelemetryEventRegistry: + """Test TelemetryEventRegistry class.""" + + def test_init(self): + """Test registry initialization.""" + registry = TelemetryEventRegistry(max_events=100) + + assert registry.max_events == 100 + assert registry.enable_metrics is True + assert registry.enable_tracing is True + assert len(registry.events) == 0 + + def test_register_event(self): + """Test event registration.""" + registry = TelemetryEventRegistry() + event = create_event(EventType.STT_TRANSCRIPT, text="test") + + # Mock telemetry methods + registry._record_event_metrics = Mock() + registry._record_event_trace = Mock() + + registry.register_event(event) + + assert len(registry.events) == 1 + assert registry.event_counts[EventType.STT_TRANSCRIPT] == 1 + registry._record_event_metrics.assert_called_once_with(event) + registry._record_event_trace.assert_called_once_with(event) + + def test_get_events_with_filter(self): + """Test getting events with filter.""" + registry = TelemetryEventRegistry() + + # Add events + event1 = create_event(EventType.STT_TRANSCRIPT, text="test1", plugin_name="plugin1") + event2 = create_event(EventType.STT_TRANSCRIPT, text="test2", plugin_name="plugin2") + + registry.register_event(event1) + registry.register_event(event2) + + # Filter by plugin + filtered_events = registry.get_events(filter_criteria={"plugin_name": "plugin1"}) + + assert len(filtered_events) == 1 + assert filtered_events[0].plugin_name == "plugin1" + + def test_get_statistics(self): + """Test getting registry statistics.""" + registry = TelemetryEventRegistry() + + # Add some events + event = create_event(EventType.STT_TRANSCRIPT, text="test") + registry.register_event(event) + + stats = registry.get_statistics() + + assert stats["total_events"] == 1 + assert "STT_TRANSCRIPT" in stats["event_distribution"] + assert stats["session_count"] == 0 + + +class TestGlobalFunctions: + """Test global telemetry functions.""" + + def test_initialize_and_get_telemetry(self): + """Test global telemetry initialization and retrieval.""" + # Initialize telemetry + telemetry1 = initialize_telemetry() + assert telemetry1 is not None + + # Get the same instance + telemetry2 = get_telemetry() + assert telemetry2 is telemetry1 + + def test_get_global_registry(self): + """Test global registry retrieval.""" + registry = get_global_telemetry_registry() + assert registry is not None + assert isinstance(registry, TelemetryEventRegistry) + + +class TestTraceDecorator: + """Test the trace_plugin_operation decorator.""" + + def test_trace_decorator_success(self): + """Test successful operation tracing.""" + @trace_plugin_operation("test_op", "test_plugin") + def test_function(): + return "success" + + # Mock telemetry + with patch('getstream.plugins.common.telemetry.get_telemetry') as mock_get: + mock_telemetry = Mock() + mock_get.return_value = mock_telemetry + + result = test_function() + + assert result == "success" + mock_telemetry.record_plugin_operation.assert_called_once() + + def test_trace_decorator_error(self): + """Test error handling in tracing decorator.""" + @trace_plugin_operation("test_op", "test_plugin") + def test_function(): + raise ValueError("test error") + + # Mock telemetry + with patch('getstream.plugins.common.telemetry.get_telemetry') as mock_get: + mock_telemetry = Mock() + mock_get.return_value = mock_telemetry + + with pytest.raises(ValueError): + test_function() + + # Should record error + mock_telemetry.record_error.assert_called_once() + + +class TestIntegration: + """Integration tests for telemetry components.""" + + def test_full_workflow(self): + """Test complete telemetry workflow.""" + # Initialize telemetry + config = TelemetryConfig(enable_tracing=False, enable_metrics=False) + telemetry = PluginTelemetry(config) + + # Create event emitter + emitter = TelemetryEventEmitter("test_plugin", telemetry) + + # Create event registry + registry = TelemetryEventRegistry(telemetry=telemetry) + + # Create and emit event + event = create_event(EventType.STT_TRANSCRIPT, text="test") + emitter.emit(event) + + # Register event + registry.register_event(event) + + # Verify workflow + assert len(registry.events) == 1 + assert registry.event_counts[EventType.STT_TRANSCRIPT] == 1 + + # Get statistics + stats = registry.get_statistics() + assert stats["total_events"] == 1 + + def test_error_handling(self): + """Test error handling in telemetry workflow.""" + # Initialize telemetry + config = TelemetryConfig(enable_tracing=False, enable_metrics=False) + telemetry = PluginTelemetry(config) + + # Create event emitter + emitter = TelemetryEventEmitter("test_plugin", telemetry) + + # Test error emission + error = ValueError("test error") + emitter.emit_error(error, EventType.PLUGIN_ERROR, {"context": "test"}) + + # Verify error was recorded + # (Note: In a real scenario, this would be verified through the telemetry system) + assert True # Placeholder assertion + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/getstream/plugins/common/tts.py b/getstream/plugins/common/tts.py index 2aef71c6..e58bb88e 100644 --- a/getstream/plugins/common/tts.py +++ b/getstream/plugins/common/tts.py @@ -65,6 +65,19 @@ def __init__(self, provider_name: Optional[str] = None): ) register_global_event(init_event) self.emit("initialized", init_event) + + # Emit to telemetry if available + if hasattr(self, 'telemetry_emitter'): + self.telemetry_emitter.emit(init_event) + + # Initialize telemetry registry if available + try: + from getstream.plugins.common import TelemetryEventRegistry + self.telemetry_registry = TelemetryEventRegistry() + # Register initialization event in telemetry registry + self.telemetry_registry.register_event(init_event) + except ImportError: + self.telemetry_registry = None def set_output_track(self, track: AudioStreamTrack) -> None: """ @@ -150,6 +163,10 @@ async def send( ) register_global_event(start_event) self.emit("synthesis_start", start_event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(start_event) # Synthesize audio audio_data = await self.stream_audio(text, *args, **kwargs) @@ -178,6 +195,10 @@ async def send( ) register_global_event(audio_event) self.emit("audio", audio_event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(audio_event) elif inspect.isasyncgen(audio_data): async for chunk in audio_data: if isinstance(chunk, bytes): @@ -199,6 +220,10 @@ async def send( ) register_global_event(audio_event) self.emit("audio", audio_event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(audio_event) else: # assume it's a Cartesia TTS chunk object total_audio_bytes += len(chunk.data) audio_chunks += 1 @@ -218,6 +243,10 @@ async def send( ) register_global_event(audio_event) self.emit("audio", audio_event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(audio_event) elif hasattr(audio_data, "__iter__") and not isinstance( audio_data, (str, bytes, bytearray) ): @@ -240,6 +269,10 @@ async def send( ) register_global_event(audio_event) self.emit("audio", audio_event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(audio_event) else: raise TypeError( f"Unsupported return type from synthesize: {type(audio_data)}" @@ -275,6 +308,10 @@ async def send( ) register_global_event(completion_event) self.emit("synthesis_complete", completion_event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(completion_event) logger.info( "Text-to-speech synthesis completed", @@ -305,6 +342,10 @@ async def send( register_global_event(error_event) self.emit("error", error_event) # New structured event self.emit("error_legacy", e) # Backward compatibility + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(error_event) # Re-raise to allow the caller to handle the error raise @@ -320,3 +361,7 @@ async def close(self): ) register_global_event(close_event) self.emit("closed", close_event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(close_event) diff --git a/getstream/plugins/common/vad.py b/getstream/plugins/common/vad.py index 277d09a0..6fba383f 100644 --- a/getstream/plugins/common/vad.py +++ b/getstream/plugins/common/vad.py @@ -97,7 +97,7 @@ def __init__( "deactivation_th": deactivation_th, }, ) - + # Emit initialization event init_event = PluginInitializedEvent( session_id=self.session_id, @@ -114,6 +114,19 @@ def __init__( ) register_global_event(init_event) self.emit("initialized", init_event) + + # Emit to telemetry if available + if hasattr(self, 'telemetry_emitter'): + self.telemetry_emitter.emit(init_event) + + # Initialize telemetry registry if available + try: + from getstream.plugins.common import TelemetryEventRegistry + self.telemetry_registry = TelemetryEventRegistry() + # Register initialization event in telemetry registry + self.telemetry_registry.register_event(init_event) + except ImportError: + self.telemetry_registry = None @abc.abstractmethod async def is_speech(self, frame: PcmData) -> float: @@ -234,6 +247,10 @@ async def _process_frame( ) register_global_event(partial_event) self.emit("partial", partial_event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(partial_event) logger.debug( f"Emitted partial event with {len(current_samples)} samples" @@ -284,6 +301,10 @@ async def _process_frame( ) register_global_event(speech_start_event) self.emit("speech_start", speech_start_event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(speech_start_event) # Add this frame to the buffer using shared utility frame_bytes = numpy_array_to_bytes(frame.samples) @@ -319,6 +340,10 @@ async def _flush_speech_buffer(self, user: Optional[Dict[str, Any]] = None) -> N ) register_global_event(audio_event) self.emit("audio", audio_event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(audio_event) logger.debug(f"Emitted audio event with {len(speech_data)} samples") @@ -336,6 +361,10 @@ async def _flush_speech_buffer(self, user: Optional[Dict[str, Any]] = None) -> N ) register_global_event(speech_end_event) self.emit("speech_end", speech_end_event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(speech_end_event) # Reset state variables self.speech_buffer = bytearray() @@ -376,6 +405,10 @@ def _emit_error_event(self, error: Exception, context: str = "", user_metadata: ) register_global_event(error_event) self.emit("error", error_event) # Structured event + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(error_event) async def close(self): """Close the VAD service and release any resources.""" @@ -393,3 +426,7 @@ async def close(self): ) register_global_event(close_event) self.emit("closed", close_event) + + # Register in telemetry registry if available + if hasattr(self, 'telemetry_registry'): + self.telemetry_registry.register_event(close_event) diff --git a/getstream/plugins/deepgram/stt/stt.py b/getstream/plugins/deepgram/stt/stt.py index 5ac185cb..65027e14 100644 --- a/getstream/plugins/deepgram/stt/stt.py +++ b/getstream/plugins/deepgram/stt/stt.py @@ -17,7 +17,7 @@ LiveOptions = None # type: ignore _deepgram_available = False -from getstream.plugins.common import STT +from getstream.plugins.common import STT, TelemetryEventEmitter, TelemetryEventRegistry from getstream.video.rtc.track_util import PcmData logger = logging.getLogger(__name__) @@ -97,6 +97,12 @@ def __init__( self._running = False self._setup_attempted = False self._is_closed = False + + # Initialize telemetry event emitter + self.telemetry_emitter = TelemetryEventEmitter("deepgram_stt") + + # Initialize telemetry registry + self.telemetry_registry = TelemetryEventRegistry() # Track current user context for associating transcripts with users self._current_user = None diff --git a/getstream/plugins/elevenlabs/tts/tts.py b/getstream/plugins/elevenlabs/tts/tts.py index c7e459c9..6b0def61 100644 --- a/getstream/plugins/elevenlabs/tts/tts.py +++ b/getstream/plugins/elevenlabs/tts/tts.py @@ -1,6 +1,6 @@ import logging -from getstream.plugins.common import TTS +from getstream.plugins.common import TTS, TelemetryEventEmitter from elevenlabs.client import AsyncElevenLabs from getstream.video.rtc.audio_track import AudioStreamTrack from typing import AsyncIterator, Optional @@ -35,6 +35,13 @@ def __init__( self.voice_id = voice_id self.model_id = model_id self.output_format = "pcm_16000" + + # Initialize telemetry event emitter + self.telemetry_emitter = TelemetryEventEmitter("elevenlabs_tts") + + # Initialize telemetry registry + from getstream.plugins.common import TelemetryEventRegistry + self.telemetry_registry = TelemetryEventRegistry() def set_output_track(self, track: AudioStreamTrack) -> None: if track.framerate != 16000: diff --git a/getstream/plugins/silero/vad/vad.py b/getstream/plugins/silero/vad/vad.py index a0707773..e0dde9c0 100644 --- a/getstream/plugins/silero/vad/vad.py +++ b/getstream/plugins/silero/vad/vad.py @@ -4,7 +4,7 @@ import warnings import time from typing import Dict, Any, Optional -from getstream.plugins.common import VAD +from getstream.plugins.common import VAD, TelemetryEventEmitter from getstream.video.rtc.track_util import PcmData from getstream.audio.utils import resample_audio from getstream.plugins.common.events import VADAudioEvent @@ -96,6 +96,13 @@ def __init__( self.window_samples = window_samples self.device_name = device self.use_onnx = use_onnx and has_onnx + + # Initialize telemetry event emitter + self.telemetry_emitter = TelemetryEventEmitter("silero_vad") + + # Initialize telemetry registry + from getstream.plugins.common import TelemetryEventRegistry + self.telemetry_registry = TelemetryEventRegistry() # Verify window size is correct for the Silero model if self.model_rate == 16000 and self.window_samples != 512: