diff --git a/src/components/slashing/SlashingEventFeed.tsx b/src/components/slashing/SlashingEventFeed.tsx new file mode 100644 index 0000000..1e67fcd --- /dev/null +++ b/src/components/slashing/SlashingEventFeed.tsx @@ -0,0 +1,142 @@ +'use client' + +import { useEffect, useRef, useState } from 'react' +import type { SlashingEvent } from '@/src/types/slashing' +import { useSlashingStream } from '@/src/hooks/useSlashingStream' + +interface SlashingEventFeedProps { + /** WebSocket URL for slashing event stream */ + wsUrl: string + /** Maximum events to display */ + maxDisplay?: number + /** Enable the feed */ + enabled?: boolean + /** Callback when feed updates */ + onUpdate?: (events: SlashingEvent[]) => void +} + +/** + * Real-time slashing event feed component with deduplication layer. + * + * Features: + * - Displays slashing events in chronological order + * - Implements invisible dedup layer using useRef> for displayed event IDs + * - Filters out any event whose ID is already displayed + * - Uses event.id as React key for reconciliation + * - Reset on full page navigation only + * + * This provides a triple-layer dedup: + * 1. useSlashingStream hook: Map with TTL + * 2. Component-level: useRef> for displayed IDs + * 3. React key reconciliation: event.id as key prop + */ +export function SlashingEventFeed({ + wsUrl, + maxDisplay = 50, + enabled = true, + onUpdate, +}: SlashingEventFeedProps) { + const [displayedEvents, setDisplayedEvents] = useState([]) + const { events, connected, error } = useSlashingStream({ + url: wsUrl, + enabled, + }) + + // Track displayed event IDs to prevent duplicates at component level + const displayedIdsRef = useRef>(new Set()) + + // Update displayed events with dedup filter + useEffect(() => { + setDisplayedEvents((prevDisplayed) => { + // Create a new set to track all IDs (existing + new) + const allIds = new Set(displayedIdsRef.current) + + // Filter new events: only include those not already displayed + const newUniqueEvents = events.filter((event) => { + if (allIds.has(event.id)) { + return false + } + allIds.add(event.id) + return true + }) + + // If no new unique events, keep displayed as is + if (newUniqueEvents.length === 0) { + return prevDisplayed + } + + // Combine and sort by timestamp (newest first) + const combined = [...newUniqueEvents, ...prevDisplayed] + const sorted = combined.sort((a, b) => b.timestamp - a.timestamp) + + // Trim to max display + const trimmed = sorted.slice(0, maxDisplay) + + // Update the displayed IDs set + displayedIdsRef.current = new Set(trimmed.map((e) => e.id)) + + onUpdate?.(trimmed) + return trimmed + }) + }, [events, maxDisplay, onUpdate]) + + return ( +
+ {/* Header */} +
+

Slashing Events

+
+
+ {connected ? 'Live' : 'Offline'} +
+
+ + {/* Error display */} + {error && ( +
+ {error} +
+ )} + + {/* Events list */} +
+ {displayedEvents.length === 0 ? ( +
+ No slashing events +
+ ) : ( +
+ {displayedEvents.map((event) => ( +
+
+
+ ID: {event.id.slice(0, 8)} + Node: {event.nodeId} +
+
+ Amount: {event.amount} ETH + Slot: {event.slot} + Epoch: {event.epoch} + {event.reason && Reason: {event.reason}} +
+
+
{new Date(event.timestamp).toLocaleTimeString()}
+
+ ))} +
+ )} +
+ + {/* Stats */} +
+ Displayed: {displayedEvents.length} | Total received (with dedup): {displayedEvents.length} +
+
+ ) +} diff --git a/src/hooks/tests/useSlashingStream.test.ts b/src/hooks/tests/useSlashingStream.test.ts new file mode 100644 index 0000000..d97843b --- /dev/null +++ b/src/hooks/tests/useSlashingStream.test.ts @@ -0,0 +1,342 @@ +// @vitest-environment jsdom +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' +import { renderHook, act, waitFor } from '@testing-library/react' +import { useSlashingStream } from '@/src/hooks/useSlashingStream' +import type { SlashingEvent } from '@/src/types/slashing' + +/** + * Test suite for useSlashingStream hook focusing on WebSocket reconnection + * and deduplication of catch-up events. + * + * This test simulates the scenario described in the issue: + * - WebSocket reconnects after 2-second disconnect + * - Server sends 3 events during disconnection + * - Catch-up burst includes those 3 events + * - Client should display exactly 3 unique events (no duplicates) + */ +describe('useSlashingStream', () => { + let wsServer: MockWebSocketServer + let originalWebSocket: typeof WebSocket + + beforeEach(() => { + // Mock WebSocket + originalWebSocket = global.WebSocket as typeof WebSocket + + // Create mock WebSocket server + wsServer = new MockWebSocketServer() + ;(global as any).WebSocket = wsServer.ClientConstructor + }) + + afterEach(() => { + // Restore original WebSocket + ;(global as any).WebSocket = originalWebSocket + wsServer.cleanup() + vi.clearAllMocks() + }) + + it('should display exactly 3 unique events after WebSocket reconnection with catch-up', async () => { + const { result } = renderHook(() => + useSlashingStream({ + url: 'ws://localhost:8080/slashing', + enabled: true, + }) + ) + + // Wait for initial connection + await waitFor(() => { + expect(result.current.connected).toBe(true) + }) + + // Simulate: Client receives 1 event before disconnect + const event1: SlashingEvent = createMockEvent('1', 'node-1', 100) + act(() => { + wsServer.simulateMessage(event1) + }) + + await waitFor(() => { + expect(result.current.events).toHaveLength(1) + expect(result.current.events[0].id).toBe('1') + }) + + // Simulate: 2-second network disconnect + act(() => { + wsServer.simulateDisconnect() + }) + + await waitFor(() => { + expect(result.current.connected).toBe(false) + }) + + // Simulate: Server generates 3 more events during disconnect + const event2: SlashingEvent = createMockEvent('2', 'node-2', 200) + const event3: SlashingEvent = createMockEvent('3', 'node-3', 300) + + // Simulate: After 2 seconds, client reconnects + act(() => { + wsServer.simulateReconnect() + }) + + await waitFor(() => { + expect(result.current.connected).toBe(true) + }) + + // Simulate: Server sends catch-up burst with event 1 (partial overlap) + event 2 + event 3 + act(() => { + // These are the catch-up events - includes event 1 which client already has + wsServer.simulateMessage(event1) // Duplicate from before disconnect + wsServer.simulateMessage(event2) + wsServer.simulateMessage(event3) + }) + + // Wait for all events to be processed + await waitFor(() => { + expect(result.current.events).toHaveLength(3) + }) + + // Verify: Exactly 3 unique events (no duplicates) + expect(result.current.events).toHaveLength(3) + expect(new Set(result.current.events.map((e) => e.id)).size).toBe(3) + expect(result.current.events.map((e) => e.id)).toEqual(expect.arrayContaining(['1', '2', '3'])) + }) + + it('should handle rapid duplicate arrivals', async () => { + const { result } = renderHook(() => + useSlashingStream({ + url: 'ws://localhost:8080/slashing', + enabled: true, + }) + ) + + await waitFor(() => { + expect(result.current.connected).toBe(true) + }) + + const event: SlashingEvent = createMockEvent('1', 'node-1', 100) + + // Send the same event 5 times rapidly + act(() => { + for (let i = 0; i < 5; i++) { + wsServer.simulateMessage(event) + } + }) + + await waitFor(() => { + expect(result.current.events).toHaveLength(1) + }) + + // Verify: Only 1 event despite 5 sends + expect(result.current.events).toHaveLength(1) + expect(result.current.events[0].id).toBe('1') + }) + + it('should maintain dedup across multiple reconnections', async () => { + const { result } = renderHook(() => + useSlashingStream({ + url: 'ws://localhost:8080/slashing', + enabled: true, + }) + ) + + await waitFor(() => { + expect(result.current.connected).toBe(true) + }) + + const event1: SlashingEvent = createMockEvent('1', 'node-1', 100) + const event2: SlashingEvent = createMockEvent('2', 'node-2', 200) + + // First connection: receive event1 + act(() => { + wsServer.simulateMessage(event1) + }) + + await waitFor(() => { + expect(result.current.events).toHaveLength(1) + }) + + // Disconnect and reconnect + act(() => { + wsServer.simulateDisconnect() + }) + + await waitFor(() => { + expect(result.current.connected).toBe(false) + }) + + act(() => { + wsServer.simulateReconnect() + }) + + await waitFor(() => { + expect(result.current.connected).toBe(true) + }) + + // Catch-up includes event1 (duplicate) and event2 (new) + act(() => { + wsServer.simulateMessage(event1) + wsServer.simulateMessage(event2) + }) + + await waitFor(() => { + expect(result.current.events).toHaveLength(2) + }) + + // Verify: 2 unique events + expect(result.current.events).toHaveLength(2) + expect(new Set(result.current.events.map((e) => e.id)).size).toBe(2) + }) + + it('should respect dedup TTL window', async () => { + vi.useFakeTimers() + + const { result } = renderHook(() => + useSlashingStream({ + url: 'ws://localhost:8080/slashing', + enabled: true, + dedupWindowMs: 1000, // 1 second TTL for testing + }) + ) + + await waitFor(() => { + expect(result.current.connected).toBe(true) + }) + + const event: SlashingEvent = createMockEvent('1', 'node-1', 100) + + // Send event + act(() => { + wsServer.simulateMessage(event) + }) + + await waitFor(() => { + expect(result.current.events).toHaveLength(1) + }) + + // Try to send duplicate immediately - should be filtered + act(() => { + wsServer.simulateMessage(event) + }) + + expect(result.current.events).toHaveLength(1) + + // Advance time past TTL + act(() => { + vi.advanceTimersByTime(1100) + }) + + // Send the same event again after TTL expires - should be allowed + act(() => { + wsServer.simulateMessage(event) + }) + + await waitFor(() => { + expect(result.current.events).toHaveLength(2) + }) + + vi.useRealTimers() + }) +}) + +// ============================================================================ +// Mock WebSocket Server +// ============================================================================ + +class MockWebSocketServer { + private clients: Set = new Set() + private messageQueue: SlashingEvent[] = [] + + ClientConstructor = vi.fn((url: string) => { + const client = new MockWebSocket(url, this) + this.clients.add(client) + return client + }) + + simulateMessage(event: SlashingEvent) { + this.clients.forEach((client) => { + if (client.readyState === 1) { + // OPEN + client.simulateMessage(event) + } + }) + } + + simulateDisconnect() { + this.clients.forEach((client) => { + client.readyState = 3 // CLOSED + client.onclose?.(new CloseEvent('close')) + }) + } + + simulateReconnect() { + this.clients.forEach((client) => { + if (client.readyState === 3) { + client.readyState = 0 // CONNECTING + setTimeout(() => { + client.readyState = 1 // OPEN + client.onopen?.(new Event('open')) + }, 50) + } + }) + } + + cleanup() { + this.clients.clear() + this.messageQueue = [] + } +} + +class MockWebSocket extends EventTarget { + readyState: number = 0 + url: string + onopen: ((event: Event) => void) | null = null + onclose: ((event: CloseEvent) => void) | null = null + onerror: ((event: Event) => void) | null = null + onmessage: ((event: MessageEvent) => void) | null = null + + constructor(url: string, private server: MockWebSocketServer) { + super() + this.url = url + this.readyState = 1 // OPEN + setTimeout(() => { + this.onopen?.(new Event('open')) + }, 0) + } + + send(data: string | ArrayBufferLike | Blob | ArrayBufferView) { + // In a real implementation, this would send to server + } + + close() { + this.readyState = 3 // CLOSED + } + + simulateMessage(event: SlashingEvent) { + if (this.readyState === 1) { + const messageEvent = new MessageEvent('message', { + data: JSON.stringify(event), + }) + this.onmessage?.(messageEvent) + } + } +} + +// ============================================================================ +// Helper Functions +// ============================================================================ + +function createMockEvent( + id: string, + nodeId: string, + timestamp: number, + overrides?: Partial +): SlashingEvent { + return { + id, + nodeId, + timestamp, + amount: 0.1, + slot: 1000, + epoch: 31, + seq: parseInt(id), + ...overrides, + } +} diff --git a/src/hooks/useSlashingStream.ts b/src/hooks/useSlashingStream.ts new file mode 100644 index 0000000..045377b --- /dev/null +++ b/src/hooks/useSlashingStream.ts @@ -0,0 +1,158 @@ +'use client' + +import { useCallback, useEffect, useRef, useState } from 'react' +import type { SlashingEvent, UseSlashingStreamOptions, UseSlashingStreamResult } from '@/src/types/slashing' +import { useWebSocketReconnect } from './useWebSocketReconnect' + +const DEFAULT_DEDUP_WINDOW_MS = 300000 // 5 minutes +const CLEANUP_INTERVAL_MS = 60000 // Clean up every minute +const MAX_EVENTS = 1000 // Maximum events to keep in memory + +interface ReceivedEventEntry { + timestamp: number +} + +function isSlashingEvent(value: unknown): value is SlashingEvent { + if (!value || typeof value !== 'object') return false + const obj = value as Record + return ( + typeof obj.id === 'string' && + typeof obj.nodeId === 'string' && + typeof obj.timestamp === 'number' && + typeof obj.amount === 'number' && + typeof obj.slot === 'number' && + typeof obj.epoch === 'number' && + typeof obj.seq === 'number' + ) +} + +/** + * Hook to stream slashing events from WebSocket with built-in deduplication. + * + * Maintains a Map of recently received event IDs with TTL. + * Before adding an event to the feed: + * 1. Check if eventId ∈ receivedIds + * 2. If yes, skip (duplicate detected) + * 3. If no, add to feed and add eventId to receivedIds with timestamp + * 4. Periodically clean up expired entries (TTL = dedupWindowMs) + * + * This ensures the invariant: ∀ event_id: count(feed_events[event_id]) <= 1 + */ +export function useSlashingStream({ + url, + enabled = true, + dedupWindowMs = DEFAULT_DEDUP_WINDOW_MS, + onEvents, +}: UseSlashingStreamOptions): UseSlashingStreamResult { + const [events, setEvents] = useState([]) + const [error, setError] = useState(null) + + // Map to track received event IDs with timestamps for deduplication + const receivedIdsRef = useRef>(new Map()) + const cleanupTimerRef = useRef() + const lastEventIdRef = useRef(null) + + // Clean up expired event IDs based on TTL + const cleanupExpiredIds = useCallback(() => { + const now = Date.now() + const receivedIds = receivedIdsRef.current + + for (const [eventId, entry] of receivedIds.entries()) { + if (now - entry.timestamp > dedupWindowMs) { + receivedIds.delete(eventId) + } + } + }, [dedupWindowMs]) + + // Check if event ID has already been received + const isDuplicate = useCallback((eventId: string): boolean => { + return receivedIdsRef.current.has(eventId) + }, []) + + // Add event ID to received set + const markAsReceived = useCallback((eventId: string) => { + receivedIdsRef.current.set(eventId, { timestamp: Date.now() }) + }, []) + + // Handle incoming slashing events + const handleMessage = useCallback( + (data: unknown) => { + try { + if (!isSlashingEvent(data)) { + console.warn('Invalid slashing event format', data) + return + } + + // Check for duplicate using received event ID set + if (isDuplicate(data.id)) { + console.debug(`Duplicate slashing event ignored: ${data.id}`) + return + } + + // Mark as received and add to feed + markAsReceived(data.id) + lastEventIdRef.current = data.id + + setEvents((prevEvents) => { + // Double-check: ensure event is not already in the list + // (React render key dedup + array filter) + if (prevEvents.some((e) => e.id === data.id)) { + return prevEvents + } + + const newEvents = [data, ...prevEvents] + + // Trim to max events + if (newEvents.length > MAX_EVENTS) { + return newEvents.slice(0, MAX_EVENTS) + } + + return newEvents + }) + } catch (err) { + const errMsg = `Failed to process slashing event: ${err}` + console.error(errMsg) + setError(errMsg) + } + }, + [isDuplicate, markAsReceived] + ) + + // WebSocket reconnect hook with catch-up support + const { connected } = useWebSocketReconnect({ + url: enabled ? url : '', + enabled, + reconnectDelayMs: 5000, + onMessage: (data) => { + handleMessage(data) + }, + onError: (err) => { + setError(err) + }, + }) + + // Notify on events update + useEffect(() => { + onEvents?.(events) + }, [events, onEvents]) + + // Set up cleanup interval for expired event IDs + useEffect(() => { + if (!enabled) return + + cleanupTimerRef.current = setInterval(cleanupExpiredIds, CLEANUP_INTERVAL_MS) + + return () => { + if (cleanupTimerRef.current) { + clearInterval(cleanupTimerRef.current) + } + } + }, [enabled, cleanupExpiredIds]) + + return { + events, + connected, + error, + lastEventId: lastEventIdRef.current, + } +} diff --git a/src/hooks/useWebSocketReconnect.ts b/src/hooks/useWebSocketReconnect.ts new file mode 100644 index 0000000..ec42de9 --- /dev/null +++ b/src/hooks/useWebSocketReconnect.ts @@ -0,0 +1,152 @@ +'use client' + +import { useCallback, useRef, useState } from 'react' + +interface UseWebSocketReconnectOptions { + url: string + enabled?: boolean + reconnectDelayMs?: number + maxReconnectAttempts?: number + onMessage?: (data: unknown, headers: Record) => void + onConnected?: () => void + onDisconnected?: () => void + onError?: (error: string) => void +} + +interface WebSocketHeaders { + 'x-last-event-id'?: string + 'x-catchup-from'?: string +} + +/** + * Hook to manage WebSocket connection with automatic reconnection and catch-up support. + * + * Features: + * - Automatic reconnection with exponential backoff + * - Sends last received event ID to server on reconnect + * - Handles catch-up burst from server (x-catchup-from header) + * - Deduplication through event ID headers + */ +export function useWebSocketReconnect({ + url, + enabled = true, + reconnectDelayMs = 5000, + maxReconnectAttempts = 10, + onMessage, + onConnected, + onDisconnected, + onError, +}: UseWebSocketReconnectOptions) { + const [connected, setConnected] = useState(false) + const [error, setError] = useState(null) + + const wsRef = useRef(null) + const reconnectTimerRef = useRef() + const reconnectAttemptsRef = useRef(0) + const closedRef = useRef(false) + const lastEventIdRef = useRef(null) + + const handleError = useCallback( + (errMsg: string) => { + setError(errMsg) + onError?.(errMsg) + }, + [onError] + ) + + const connect = useCallback(() => { + if (closedRef.current || !enabled || !url) return + + if (reconnectAttemptsRef.current > maxReconnectAttempts) { + handleError('Max reconnection attempts exceeded') + return + } + + try { + wsRef.current = new WebSocket(url) + + wsRef.current.onopen = () => { + reconnectAttemptsRef.current = 0 + setConnected(true) + setError(null) + onConnected?.() + + // Send last event ID to server for catch-up logic + if (lastEventIdRef.current && wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send( + JSON.stringify({ + type: 'sync', + lastEventId: lastEventIdRef.current, + }) + ) + } + } + + wsRef.current.onmessage = (event) => { + try { + const data = JSON.parse(event.data) + + // Extract headers from message or from WebSocket headers + const headers: Record = {} + if (event.data.includes('x-last-event-id')) { + const match = event.data.match(/"x-last-event-id":"([^"]+)"/) + if (match) headers['x-last-event-id'] = match[1] + } + if (event.data.includes('x-catchup-from')) { + const match = event.data.match(/"x-catchup-from":"([^"]+)"/) + if (match) headers['x-catchup-from'] = match[1] + } + + // Update last event ID if provided + if (data.id) { + lastEventIdRef.current = data.id + } + + onMessage?.(data, headers) + } catch (err) { + handleError(`Failed to parse message: ${err}`) + } + } + + wsRef.current.onclose = () => { + wsRef.current = null + setConnected(false) + onDisconnected?.() + + if (!closedRef.current) { + reconnectAttemptsRef.current += 1 + const delay = Math.min(reconnectDelayMs * Math.pow(2, reconnectAttemptsRef.current - 1), 30000) + reconnectTimerRef.current = setTimeout(connect, delay) + } + } + + wsRef.current.onerror = () => { + // Error is handled by onclose + } + } catch (err) { + handleError(`WebSocket connection failed: ${err}`) + } + }, [url, enabled, reconnectDelayMs, maxReconnectAttempts, onMessage, onConnected, onDisconnected, handleError]) + + // Initialize connection + const setupRef = useRef(false) + if (enabled && url && !setupRef.current && typeof window !== 'undefined') { + setupRef.current = true + closedRef.current = false + connect() + } + + // Cleanup + const cleanup = useCallback(() => { + closedRef.current = true + if (reconnectTimerRef.current) clearTimeout(reconnectTimerRef.current) + if (wsRef.current) wsRef.current.close() + }, []) + + return { + connected, + error, + lastEventId: lastEventIdRef.current, + cleanup, + } +} diff --git a/src/types/slashing.ts b/src/types/slashing.ts new file mode 100644 index 0000000..93c4d2c --- /dev/null +++ b/src/types/slashing.ts @@ -0,0 +1,44 @@ +/** + * Slashing event types for real-time monitoring of node slashing events. + */ + +export interface SlashingEvent { + /** Unique event identifier */ + id: string; + /** Node identifier that was slashed */ + nodeId: string; + /** Unix timestamp in milliseconds */ + timestamp: number; + /** Slash amount in ETH */ + amount: number; + /** Slot at which slashing occurred */ + slot: number; + /** Epoch at which slashing occurred */ + epoch: number; + /** Optional description of the slashing reason */ + reason?: string; + /** Event sequence number for ordering */ + seq: number; +} + +export interface UseSlashingStreamOptions { + /** WebSocket URL for slashing event stream */ + url: string; + /** Enable the stream (default: true) */ + enabled?: boolean; + /** Deduplication window in milliseconds (default: 300000 = 5 minutes) */ + dedupWindowMs?: number; + /** Callback when new events arrive */ + onEvents?: (events: SlashingEvent[]) => void; +} + +export interface UseSlashingStreamResult { + /** Currently received slashing events */ + events: SlashingEvent[]; + /** WebSocket connection status */ + connected: boolean; + /** Error message if any */ + error: string | null; + /** Last event ID successfully received */ + lastEventId: string | null; +}