Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@
"ws": "^8.18.3",
"y-websocket": "^3.0.0"
}
}
}
158 changes: 158 additions & 0 deletions src/infra/y-redis/y-redis.client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ describe(YRedisClient.name, () => {
expect(result).toEqual(expectedResult);
});
});

describe('when redis.readStreams returns null', () => {
it('should return empty array', async () => {
// @ts-ignore
redis.readStreams.mockResolvedValueOnce([{ name: 'stream1', messages: null }]);

const result = await yRedisClient.getMessages([
{
key: 'stream1',
id: '1',
},
]);

expect(result).toEqual([{ stream: 'stream1', messages: [], lastId: '' }]);
});
});
});

describe('addMessage', () => {
Expand Down Expand Up @@ -350,4 +366,146 @@ describe(YRedisClient.name, () => {
});
});
});

describe('logExistingPendingStructs', () => {
describe('when document has no pending structures', () => {
const setup = () => {
const room = 'test-room';
const docid = 'test-doc';
const ydoc = new Doc();
const logger = module.get(Logger);

return { room, docid, ydoc, logger };
};

it('should not log warning', () => {
const { room, docid, ydoc, logger } = setup();

// @ts-ignore it is private method
yRedisClient.logExistingPendingStructs(room, docid, ydoc);

expect(logger.warning).not.toHaveBeenCalled();
});
});

describe('when document has pending structures', () => {
const setup = () => {
const room = 'test-room';
const docid = 'test-doc';
const ydoc = new Doc();
const logger = module.get(Logger);
const mockPendingStructs = {
missing: new Map([
[1, 5],
[3, 2],
]),
update: new Uint8Array([1, 2, 3, 4, 5]),
};

// Mock the ydoc.store.pendingStructs
Object.defineProperty(ydoc.store, 'pendingStructs', {
value: mockPendingStructs,
writable: true,
});

return { room, docid, ydoc, logger, mockPendingStructs };
};

it('should log warning with detailed analysis', () => {
const { room, docid, ydoc, logger } = setup();

// @ts-ignore it is private method
yRedisClient.logExistingPendingStructs(room, docid, ydoc);

expect(logger.warning).toHaveBeenCalledWith(
`Document ${room}/${docid} has pending structures. Details: ${JSON.stringify({
missingStructs: [
[1, 5],
[3, 2],
],
updateSize: 5,
})}`,
);
});
});
});

describe('analyzePendingStructs', () => {
describe('when analyzing pending structures', () => {
const setup = () => {
const mockPendingStructs = {
missing: new Map([
[1, 5],
[3, 2],
[10, 1],
]),
update: new Uint8Array([1, 2, 3, 4, 5, 6, 7]),
};

return { mockPendingStructs };
};

it('should return correct analysis with missing structures and update size', () => {
const { mockPendingStructs } = setup();

// @ts-ignore it is private method
const result = yRedisClient.analyzePendingStructs(mockPendingStructs);

expect(result).toEqual({
missingStructs: [
[1, 5],
[3, 2],
[10, 1],
],
updateSize: 7,
});
});
});

describe('when missing structures is empty', () => {
const setup = () => {
const mockPendingStructs = {
missing: new Map(),
update: new Uint8Array([1, 2, 3]),
};

return { mockPendingStructs };
};

it('should handle empty missing structures', () => {
const { mockPendingStructs } = setup();

// @ts-ignore it is private method
const result = yRedisClient.analyzePendingStructs(mockPendingStructs);

expect(result).toEqual({
missingStructs: [],
updateSize: 3,
});
});
});

describe('when update array is empty', () => {
const setup = () => {
const mockPendingStructs = {
missing: new Map([[5, 10]]),
update: new Uint8Array([]),
};

return { mockPendingStructs };
};

it('should handle empty update array', () => {
const { mockPendingStructs } = setup();

// @ts-ignore it is private method
const result = yRedisClient.analyzePendingStructs(mockPendingStructs);

expect(result).toEqual({
missingStructs: [[5, 10]],
updateSize: 0,
});
});
});
});
});
28 changes: 26 additions & 2 deletions src/infra/y-redis/y-redis.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ export class YRedisClient implements OnModuleInit {

ydoc.once('afterTransaction', (tr) => {
docChanged = tr.changed.size > 0;
this.logExistingPendingStructs(room, docid, ydoc);

// https://github.com/yjs/y-redis/pull/36
ydoc.destroy();
});

ydoc.transact(() => {
Expand All @@ -102,11 +106,31 @@ export class YRedisClient implements OnModuleInit {
streamName,
});

return response;
}

private logExistingPendingStructs(room: string, docid: string, ydoc: Doc): void {
if (ydoc.store.pendingStructs !== null) {
this.logger.warning(`Document ${room} has pending structs ${JSON.stringify(ydoc.store.pendingStructs)}.`);
const pendingAnalysis = this.analyzePendingStructs(ydoc.store.pendingStructs);

this.logger.warning(
`Document ${room}/${docid} has pending structures. Details: ${JSON.stringify({
...pendingAnalysis,
})}`,
);
}
}

return response;
private analyzePendingStructs(pendingStructs: { missing: Map<number, number>; update: Uint8Array }): {
missingStructs: Iterable<[number, number]>;
updateSize: number;
} {
const missingStructs = Array.from(pendingStructs.missing.entries());

return {
missingStructs,
updateSize: pendingStructs.update.length,
};
}

public async destroy(): Promise<void> {
Expand Down
Loading