diff --git a/package.json b/package.json index 3731acc..b6b6e0c 100644 --- a/package.json +++ b/package.json @@ -88,4 +88,4 @@ "ws": "^8.18.3", "y-websocket": "^3.0.0" } -} +} \ No newline at end of file diff --git a/src/infra/y-redis/y-redis.client.spec.ts b/src/infra/y-redis/y-redis.client.spec.ts index c1851d3..45efb8e 100644 --- a/src/infra/y-redis/y-redis.client.spec.ts +++ b/src/infra/y-redis/y-redis.client.spec.ts @@ -130,6 +130,22 @@ describe(YRedisClient.name, () => { expect(result).toEqual(expectedResult); }); }); + + describe('when redis.readStreams returns null', () => { + it('should return empty array', async () => { + // @ts-ignore + redis.readStreams.mockResolvedValueOnce([{ name: 'stream1', messages: null }]); + + const result = await yRedisClient.getMessages([ + { + key: 'stream1', + id: '1', + }, + ]); + + expect(result).toEqual([{ stream: 'stream1', messages: [], lastId: '' }]); + }); + }); }); describe('addMessage', () => { @@ -350,4 +366,146 @@ describe(YRedisClient.name, () => { }); }); }); + + describe('logExistingPendingStructs', () => { + describe('when document has no pending structures', () => { + const setup = () => { + const room = 'test-room'; + const docid = 'test-doc'; + const ydoc = new Doc(); + const logger = module.get(Logger); + + return { room, docid, ydoc, logger }; + }; + + it('should not log warning', () => { + const { room, docid, ydoc, logger } = setup(); + + // @ts-ignore it is private method + yRedisClient.logExistingPendingStructs(room, docid, ydoc); + + expect(logger.warning).not.toHaveBeenCalled(); + }); + }); + + describe('when document has pending structures', () => { + const setup = () => { + const room = 'test-room'; + const docid = 'test-doc'; + const ydoc = new Doc(); + const logger = module.get(Logger); + const mockPendingStructs = { + missing: new Map([ + [1, 5], + [3, 2], + ]), + update: new Uint8Array([1, 2, 3, 4, 5]), + }; + + // Mock the ydoc.store.pendingStructs + Object.defineProperty(ydoc.store, 'pendingStructs', { + value: mockPendingStructs, + writable: true, + }); + + return { room, docid, ydoc, logger, mockPendingStructs }; + }; + + it('should log warning with detailed analysis', () => { + const { room, docid, ydoc, logger } = setup(); + + // @ts-ignore it is private method + yRedisClient.logExistingPendingStructs(room, docid, ydoc); + + expect(logger.warning).toHaveBeenCalledWith( + `Document ${room}/${docid} has pending structures. Details: ${JSON.stringify({ + missingStructs: [ + [1, 5], + [3, 2], + ], + updateSize: 5, + })}`, + ); + }); + }); + }); + + describe('analyzePendingStructs', () => { + describe('when analyzing pending structures', () => { + const setup = () => { + const mockPendingStructs = { + missing: new Map([ + [1, 5], + [3, 2], + [10, 1], + ]), + update: new Uint8Array([1, 2, 3, 4, 5, 6, 7]), + }; + + return { mockPendingStructs }; + }; + + it('should return correct analysis with missing structures and update size', () => { + const { mockPendingStructs } = setup(); + + // @ts-ignore it is private method + const result = yRedisClient.analyzePendingStructs(mockPendingStructs); + + expect(result).toEqual({ + missingStructs: [ + [1, 5], + [3, 2], + [10, 1], + ], + updateSize: 7, + }); + }); + }); + + describe('when missing structures is empty', () => { + const setup = () => { + const mockPendingStructs = { + missing: new Map(), + update: new Uint8Array([1, 2, 3]), + }; + + return { mockPendingStructs }; + }; + + it('should handle empty missing structures', () => { + const { mockPendingStructs } = setup(); + + // @ts-ignore it is private method + const result = yRedisClient.analyzePendingStructs(mockPendingStructs); + + expect(result).toEqual({ + missingStructs: [], + updateSize: 3, + }); + }); + }); + + describe('when update array is empty', () => { + const setup = () => { + const mockPendingStructs = { + missing: new Map([[5, 10]]), + update: new Uint8Array([]), + }; + + return { mockPendingStructs }; + }; + + it('should handle empty update array', () => { + const { mockPendingStructs } = setup(); + + // @ts-ignore it is private method + const result = yRedisClient.analyzePendingStructs(mockPendingStructs); + + expect(result).toEqual({ + missingStructs: [[5, 10]], + updateSize: 0, + }); + }); + }); + }); }); diff --git a/src/infra/y-redis/y-redis.client.ts b/src/infra/y-redis/y-redis.client.ts index 7787dfc..0569880 100644 --- a/src/infra/y-redis/y-redis.client.ts +++ b/src/infra/y-redis/y-redis.client.ts @@ -85,6 +85,10 @@ export class YRedisClient implements OnModuleInit { ydoc.once('afterTransaction', (tr) => { docChanged = tr.changed.size > 0; + this.logExistingPendingStructs(room, docid, ydoc); + + // https://github.com/yjs/y-redis/pull/36 + ydoc.destroy(); }); ydoc.transact(() => { @@ -102,11 +106,31 @@ export class YRedisClient implements OnModuleInit { streamName, }); + return response; + } + + private logExistingPendingStructs(room: string, docid: string, ydoc: Doc): void { if (ydoc.store.pendingStructs !== null) { - this.logger.warning(`Document ${room} has pending structs ${JSON.stringify(ydoc.store.pendingStructs)}.`); + const pendingAnalysis = this.analyzePendingStructs(ydoc.store.pendingStructs); + + this.logger.warning( + `Document ${room}/${docid} has pending structures. Details: ${JSON.stringify({ + ...pendingAnalysis, + })}`, + ); } + } - return response; + private analyzePendingStructs(pendingStructs: { missing: Map; update: Uint8Array }): { + missingStructs: Iterable<[number, number]>; + updateSize: number; + } { + const missingStructs = Array.from(pendingStructs.missing.entries()); + + return { + missingStructs, + updateSize: pendingStructs.update.length, + }; } public async destroy(): Promise {