diff --git a/src/CursorBatching.ts b/src/CursorBatching.ts index d782b786..7fede1ae 100644 --- a/src/CursorBatching.ts +++ b/src/CursorBatching.ts @@ -8,6 +8,7 @@ export type OutgoingBuffer = { cursor: Pick; export default class CursorBatching { outgoingBuffer: OutgoingBuffer[] = []; + pendingBuffer: OutgoingBuffer[] = []; batchTime: number; @@ -28,14 +29,11 @@ export default class CursorBatching { } pushCursorPosition(channel: RealtimeChannel, cursor: Pick) { - // Ignore the cursor update if there is no one listening - if (!this.shouldSend) return; - const timestamp = new Date().getTime(); let offset: number; // First update in the buffer is always 0 - if (this.outgoingBuffer.length === 0) { + if (this.outgoingBuffer.length === 0 && this.pendingBuffer.length === 0) { offset = 0; this.bufferStartTimestamp = timestamp; } else { @@ -43,13 +41,27 @@ export default class CursorBatching { offset = timestamp - this.bufferStartTimestamp; } + const bufferItem = { cursor, offset }; + + if (!this.shouldSend) { + // Queue cursor positions when channel is not ready (no one listening yet) + this.pushToPendingBuffer(bufferItem); + return; + } + this.hasMovement = true; - this.pushToBuffer({ cursor, offset }); + this.pushToBuffer(bufferItem); this.publishFromBuffer(channel, CURSOR_UPDATE); } setShouldSend(shouldSend: boolean) { + const wasSending = this.shouldSend; this.shouldSend = shouldSend; + + // If we just became ready to send and have pending cursor positions, process them + if (!wasSending && this.shouldSend && this.pendingBuffer.length > 0) { + this.processPendingBuffer(); + } } setBatchTime(batchTime: number) { @@ -60,6 +72,35 @@ export default class CursorBatching { this.outgoingBuffer.push(value); } + private pushToPendingBuffer(value: OutgoingBuffer) { + this.pendingBuffer.push(value); + } + + private processPendingBuffer() { + // Move all pending cursor positions to outgoing buffer + for (const item of this.pendingBuffer) { + this.pushToBuffer(item); + } + + // Clear pending buffer + this.pendingBuffer = []; + + // Start publishing if we have cursor movements + if (this.outgoingBuffer.length > 0) { + this.hasMovement = true; + // Note: We need the channel to publish, but since setShouldSend doesn't have it, + // we'll need to trigger this from the caller that has access to the channel + } + } + + // Method to manually trigger publishing when pending items are processed + triggerPublishFromPending(channel: RealtimeChannel) { + if (this.outgoingBuffer.length > 0) { + this.hasMovement = true; + this.publishFromBuffer(channel, CURSOR_UPDATE); + } + } + private async publishFromBuffer(channel: RealtimeChannel, eventName: string) { if (!this.isRunning) { this.isRunning = true; diff --git a/src/CursorQueueing.test.ts b/src/CursorQueueing.test.ts new file mode 100644 index 00000000..bfcb4afd --- /dev/null +++ b/src/CursorQueueing.test.ts @@ -0,0 +1,141 @@ +import { it, describe, expect, vi, beforeEach } from 'vitest'; +import { Realtime, RealtimeClient, RealtimeChannel } from 'ably'; +import Space from './Space.js'; +import CursorBatching from './CursorBatching.js'; +import { CURSOR_UPDATE } from './CursorConstants.js'; + +interface CursorQueueingTestContext { + client: RealtimeClient; + space: Space; + channel: RealtimeChannel; + batching: CursorBatching; +} + +vi.mock('ably'); + +describe('Cursor Queuing Bug Fix', () => { + beforeEach((context) => { + const client = new Realtime({}); + // Mock the connection object that Space expects + (client as any).connection = { id: 'test-connection-id' }; + + context.client = client; + context.space = new Space('test', client); + + // Set up cursor channel by subscribing + context.space.cursors.subscribe('update', () => {}); + context.channel = context.space.cursors.channel!; + context.batching = context.space.cursors['cursorBatching']; + + // Mock channel methods + vi.spyOn(context.channel, 'publish'); + }); + + it('BUG FIX: cursor positions set before channel ready should be queued and sent when ready', async ({ + space, + batching, + channel, + }) => { + // Mock the self member (required for cursor.set()) + vi.spyOn(space.members, 'getSelf').mockResolvedValue({ + connectionId: 'test-connection', + clientId: 'test-client', + isConnected: true, + profileData: {}, + location: null, + lastEvent: { name: 'enter', timestamp: 1 }, + }); + + // Get the publish spy + const publishSpy = vi.spyOn(channel, 'publish'); + + // Start with shouldSend false (channel not ready) + batching.setShouldSend(false); + + // Client sets cursor position before channel is ready + await space.cursors.set({ position: { x: 100, y: 200 }, data: { color: 'blue' } }); + + // Position should NOT be published immediately + expect(publishSpy).not.toHaveBeenCalled(); + + // Verify position is in pending buffer + expect(batching.pendingBuffer.length).toBe(1); + expect(batching.pendingBuffer[0].cursor.position).toEqual({ x: 100, y: 200 }); + + // Simulate channel becoming ready + batching.setShouldSend(true); + + // Trigger publish of pending items + batching.triggerPublishFromPending(channel); + + // The queued cursor position should now be published + expect(publishSpy).toHaveBeenCalledWith(CURSOR_UPDATE, [ + expect.objectContaining({ + cursor: { position: { x: 100, y: 200 }, data: { color: 'blue' } }, + }), + ]); + + // Pending buffer should be cleared + expect(batching.pendingBuffer.length).toBe(0); + }); + + it('multiple pending cursor positions are preserved and sent in order', async ({ + batching, + channel, + }) => { + const publishSpy = vi.spyOn(channel, 'publish'); + + // Start with shouldSend false + batching.setShouldSend(false); + + // Add multiple cursor positions to pending buffer + batching.pushCursorPosition(channel, { position: { x: 10, y: 20 }, data: { color: 'red' } }); + batching.pushCursorPosition(channel, { position: { x: 30, y: 40 }, data: { color: 'green' } }); + batching.pushCursorPosition(channel, { position: { x: 50, y: 60 }, data: { color: 'blue' } }); + + // Verify all positions are queued + expect(batching.pendingBuffer.length).toBe(3); + expect(publishSpy).not.toHaveBeenCalled(); + + // Set shouldSend to true (this should process pending items) + batching.setShouldSend(true); + + // Trigger publish of pending items + batching.triggerPublishFromPending(channel); + + // All pending items should be moved to outgoing buffer and published + expect(batching.pendingBuffer.length).toBe(0); + expect(publishSpy).toHaveBeenCalled(); + }); + + it('cursor positions set after shouldSend is true are published immediately', async ({ + batching, + channel, + }) => { + const publishSpy = vi.spyOn(channel, 'publish'); + + // Start with shouldSend true + batching.setShouldSend(true); + + // Add cursor position (should be published immediately) + batching.pushCursorPosition(channel, { position: { x: 100, y: 200 }, data: { color: 'yellow' } }); + + // Should be published immediately, not queued + expect(batching.pendingBuffer.length).toBe(0); + expect(publishSpy).toHaveBeenCalled(); + }); + + it('setShouldSend(true) processes existing pending items', ({ batching, channel }) => { + // Add items to pending buffer while shouldSend is false + batching.setShouldSend(false); + batching.pushCursorPosition(channel, { position: { x: 1, y: 2 }, data: {} }); + batching.pushCursorPosition(channel, { position: { x: 3, y: 4 }, data: {} }); + + expect(batching.pendingBuffer.length).toBe(2); + + // Setting shouldSend to true should process pending items + batching.setShouldSend(true); + + expect(batching.pendingBuffer.length).toBe(0); + }); +}); diff --git a/src/Cursors.ts b/src/Cursors.ts index 8cf17a16..6e17fa2f 100644 --- a/src/Cursors.ts +++ b/src/Cursors.ts @@ -110,6 +110,9 @@ export default class Cursors extends EventEmitter { * E.g. multiply the configured outboundBatchInterval by groups of 100 members instead of the total number of members. */ this.cursorBatching.setBatchTime(Math.ceil(cursorsMembers.length / 100) * this.options.outboundBatchInterval); + + // Trigger publishing of any pending cursor positions now that channel is ready + this.cursorBatching.triggerPublishFromPending(channel); } private isUnsubscribed() { diff --git a/test/integration/integration.test.ts b/test/integration/integration.test.ts index d0f1f388..3bc36b14 100644 --- a/test/integration/integration.test.ts +++ b/test/integration/integration.test.ts @@ -178,9 +178,8 @@ describe( // 2. one of its `get*()` methods is called // 3. its `subscribe` or `unsubscribe` method is called // - // This seems to mean that a client that sends cursor updates but does not listen for them will drop the first update passed to `cursors.set()`. - // - // So, to work around this, here I perform a "sacrificial" call to `performerSpace.cursors.set()`, the idea of which is to put `performerSpace.cursors.set()` into a state in which it will not drop the updates passed in subsequent calls. + // UPDATE: This race condition bug has been fixed. Early cursor positions are now queued and sent when the channel becomes ready. + // However, we'll keep this "sacrificial" call to ensure the test works correctly with the fix. await performerSpace.cursors.set({ position: { x: 0, y: 0 } }); }); @@ -203,7 +202,8 @@ describe( const observedCursorEventsData: CursorsEventMap['update'][] = []; const cursorsListener = (data: CursorsEventMap['update']) => { observedCursorEventsData.push(data); - if (observedCursorEventsData.length === 4) { + // Now expecting 5 updates: 1 sacrificial + 4 intended (since the race condition bug is fixed) + if (observedCursorEventsData.length === 5) { observerSpace.cursors.unsubscribe(cursorsListener); resolve(observedCursorEventsData); } @@ -232,8 +232,17 @@ describe( // Note that we check that the order in which we recieve the cursor updates matches that in which they were passed to `set()` const observedCursorEventsData = await cursorUpdatesPromise; + + // First cursor should be the sacrificial one from scenario 2.1 (now preserved due to bug fix) + expect(observedCursorEventsData[0]).toMatchObject({ + clientId: performerClientId, + position: { x: 0, y: 0 }, + // Note: no data field expected for the sacrificial cursor + }); + + // Remaining 4 cursors should match the intended cursors for (const [index, setCursor] of cursorsToSet.entries()) { - expect(observedCursorEventsData[index]).toMatchObject({ clientId: performerClientId, ...setCursor }); + expect(observedCursorEventsData[index + 1]).toMatchObject({ clientId: performerClientId, ...setCursor }); } }); });