Skip to content

Commit 1e9b076

Browse files
committed
SSE Awareness
1 parent 8fea84c commit 1e9b076

File tree

5 files changed

+51
-147
lines changed

5 files changed

+51
-147
lines changed

src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts

Lines changed: 18 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import {
77
HocuspocusProviderConfiguration,
88
onOutgoingMessageParameters,
99
} from '@hocuspocus/provider';
10-
import * as time from 'lib0/time';
1110
import type { MessageEvent } from 'ws';
1211
import * as Y from 'yjs';
1312

@@ -22,15 +21,6 @@ type HocuspocusProviderConfigurationUrl = Required<
2221
Partial<CompleteHocuspocusProviderConfiguration> &
2322
Required<Pick<CompleteHocuspocusProviderWebsocketConfiguration, 'url'>>;
2423

25-
interface GetPollAwarenessResponse {
26-
awareness?: Record<string, Record<string, unknown>>;
27-
}
28-
29-
interface GetPollDocResponse {
30-
updatedDoc64?: string;
31-
stateFingerprint?: string;
32-
}
33-
3424
export const isHocuspocusProviderConfigurationUrl = (
3525
data: HocuspocusProviderConfiguration,
3626
): data is HocuspocusProviderConfigurationUrl => {
@@ -151,12 +141,23 @@ export class CollaborationProvider extends HocuspocusProvider {
151141
// 1. onmessage handles messages sent with `data:` lines
152142
eventSource.onmessage = async (event) => {
153143
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-argument
154-
const { updatedDoc64, stateFingerprint } = JSON.parse(event.data) as {
144+
const { updatedDoc64, stateFingerprint, awareness64 } = JSON.parse(
145+
event.data,
146+
) as {
155147
updatedDoc64: string;
156148
stateFingerprint: string;
149+
awareness64: string;
157150
};
158151
console.log('Received SSE event:', event.data);
159152

153+
if (awareness64) {
154+
const awareness = Buffer.from(awareness64, 'base64');
155+
156+
this.onMessage({
157+
data: awareness,
158+
} as MessageEvent);
159+
}
160+
160161
if (updatedDoc64) {
161162
const uint8Array = Buffer.from(updatedDoc64, 'base64');
162163
Y.applyUpdate(this.document, uint8Array);
@@ -186,44 +187,17 @@ export class CollaborationProvider extends HocuspocusProvider {
186187
//console.log('initCollaborationSSE:data', data);
187188
}
188189

189-
// protected async longPollAwareness() {
190-
// if (!this.isWebsocketFailed) {
191-
// return;
192-
// }
193-
194-
// console.log('startPollAwareness');
195-
// let waitMs = 0;
196-
// try {
197-
// const { awareness } = await longPollRequest<GetPollAwarenessResponse>({
198-
// pollUrl: this.toPollUrl('awareness'),
199-
// timeout: CollaborationProvider.TIMEOUT,
200-
// });
201-
202-
// console.log('awareness', awareness);
203-
204-
// if (awareness) {
205-
// Object.entries(awareness).forEach(
206-
// ([clientAwarenessKeyStr, clientAwareness]) =>
207-
// this.setAwareness(Number(clientAwarenessKeyStr), clientAwareness),
208-
// );
209-
// }
210-
// } catch (error) {
211-
// console.error('Polling awareness failed:', error);
212-
// // Could be no internet connection
213-
// waitMs = 5000;
214-
// } finally {
215-
// setTimeout(() => {
216-
// void this.longPollAwareness();
217-
// }, waitMs);
218-
// }
219-
// }
220-
221190
public onMessage(event: MessageEvent) {
222191
super.onMessage(event);
223192

224193
console.log('onMessage', event);
225194
console.log('isSynced', this.isSynced);
226195
console.log('unsyncedChanges', this.unsyncedChanges);
196+
197+
// if (this.hasUnsyncedChanges) {
198+
// this.unsyncedChanges = 0;
199+
// void this.pollSync();
200+
// }
227201
}
228202

229203
public async pollSync() {
@@ -242,6 +216,7 @@ export class CollaborationProvider extends HocuspocusProvider {
242216
if (syncDoc64) {
243217
const uint8Array = Buffer.from(syncDoc64, 'base64');
244218
Y.applyUpdate(this.document, uint8Array);
219+
this.synced = true;
245220
}
246221
} catch (error) {
247222
console.error('Polling sync failed:', error);
@@ -258,17 +233,4 @@ export class CollaborationProvider extends HocuspocusProvider {
258233
const stateVector = Y.encodeStateVector(doc);
259234
return crypto.createHash('sha256').update(stateVector).digest('base64');
260235
}
261-
262-
public setAwareness(
263-
awarenessKey: number,
264-
awarenessValue: Record<string, unknown>,
265-
): void {
266-
this.awareness?.states.set(awarenessKey, awarenessValue);
267-
const currLocalMeta = this.awareness?.meta.get(awarenessKey);
268-
const clock = currLocalMeta === undefined ? 0 : currLocalMeta.clock + 1;
269-
this.awareness?.meta.set(awarenessKey, {
270-
clock,
271-
lastUpdated: time.getUnixTime(),
272-
});
273-
}
274236
}

src/frontend/servers/y-provider/src/handlers/collaborationPollHandler.ts

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,6 @@ import { hocusPocusServer } from '@/servers/hocusPocusServer';
55

66
const TIMEOUT = 30000;
77

8-
/**
9-
* Polling way of handling collaboration
10-
* @param req
11-
* @param res
12-
*/
13-
export const collaborationPollGetAwarenessHandler = async (
14-
req: PollSyncRequest<void>,
15-
res: Response,
16-
) => {
17-
const room = req.query.room;
18-
const canEdit = req.headers['x-can-edit'] === 'True';
19-
20-
const timeout = setTimeout(() => {
21-
if (!res.headersSent) {
22-
res.status(408).json({ error: 'Request Timeout' });
23-
}
24-
}, TIMEOUT);
25-
26-
if (!room) {
27-
res.status(400).json({ error: 'Room name not provided' });
28-
return;
29-
}
30-
31-
const pollSynch = new PollSync<void>(req, room, canEdit);
32-
const hpDoc = await pollSynch.initHocuspocusDocument(hocusPocusServer);
33-
34-
if (!hpDoc) {
35-
res.status(404).json({ error: 'Document not found' });
36-
return;
37-
}
38-
39-
const awareness = await pollSynch.getAwarenessStates();
40-
41-
console.log('awaaaa', awareness);
42-
43-
if (!res.headersSent) {
44-
clearTimeout(timeout);
45-
res.status(200).json({
46-
awareness,
47-
});
48-
}
49-
};
50-
518
interface CollaborationPollPostMessagePayload {
529
message64: string;
5310
}
@@ -147,6 +104,7 @@ export const collaborationPollSyncDocHandler = async (
147104
interface CollaborationPollSSEMessageResponse {
148105
updatedDoc64?: string;
149106
stateFingerprint?: string;
107+
awareness64?: string;
150108
error?: string;
151109
}
152110
export const collaborationPollSSEMessageHandler = async (
@@ -174,5 +132,5 @@ export const collaborationPollSSEMessageHandler = async (
174132
res.setHeader('Connection', 'keep-alive');
175133
res.write(': connected\n\n');
176134

177-
pollSynch.receiveMessages(res);
135+
pollSynch.listenMessages(res);
178136
};

src/frontend/servers/y-provider/src/libs/PollSync.ts

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,16 @@ import crypto from 'crypto';
33

44
import {
55
AwarenessUpdate,
6-
Connection,
7-
Debugger,
86
Document,
97
Hocuspocus,
10-
IncomingMessage,
11-
MessageReceiver,
8+
OutgoingMessage,
129
} from '@hocuspocus/server';
1310
import { Request, Response } from 'express';
1411
import { v4 as uuid } from 'uuid';
1512
import * as Y from 'yjs';
1613

17-
import { promiseDone } from '@/helpers';
1814
import { base64ToYDoc, logger, toBase64 } from '@/utils';
1915

20-
type AwarenessState = Record<string, Record<string, unknown>>;
21-
2216
export type PollSyncRequestQuery = {
2317
room?: string;
2418
};
@@ -125,10 +119,10 @@ export class PollSync<T> {
125119
logger('Polling Updated YDoc', hpDoc.name);
126120
}
127121

128-
public receiveMessages(res: Response) {
122+
public listenMessages(res: Response) {
129123
const hpDoc = this.getHpDocument();
130124

131-
const updateFn = (
125+
const updateMessagesFn = (
132126
update: Uint8Array,
133127
_origin: string,
134128
updatedDoc: Y.Doc,
@@ -156,20 +150,44 @@ export class PollSync<T> {
156150
);
157151

158152
hpDoc.off('destroy', destroyFn);
159-
hpDoc.off('update', updateFn);
153+
hpDoc.off('update', updateMessagesFn);
160154

155+
// Close the connection
161156
res.end();
162157
};
163158

164-
hpDoc.off('update', updateFn);
159+
const updateAwarenessFn = ({
160+
added,
161+
updated,
162+
removed,
163+
}: AwarenessUpdate) => {
164+
const changedClients = added.concat(updated, removed);
165+
const awarenessMessage = new OutgoingMessage(
166+
this.room,
167+
).createAwarenessUpdateMessage(hpDoc.awareness, changedClients);
168+
169+
console.log('Awareness Update', awarenessMessage);
170+
171+
res.write(
172+
`data: ${JSON.stringify({
173+
time: new Date(),
174+
awareness64: toBase64(awarenessMessage.toUint8Array()),
175+
})}\n\n`,
176+
);
177+
};
178+
179+
hpDoc.awareness.off('update', updateAwarenessFn);
180+
hpDoc.awareness.on('update', updateAwarenessFn);
181+
hpDoc.off('update', updateMessagesFn);
165182
hpDoc.off('destroy', destroyFn);
166-
hpDoc.on('update', updateFn);
183+
hpDoc.on('update', updateMessagesFn);
167184
hpDoc.on('destroy', destroyFn);
168185

169186
this.req.on('close', () => {
170187
console.log('Connection SSE closed');
171-
hpDoc.off('update', updateFn);
188+
hpDoc.off('update', updateMessagesFn);
172189
hpDoc.off('destroy', destroyFn);
190+
hpDoc.awareness.off('update', updateAwarenessFn);
173191
});
174192
}
175193

@@ -180,26 +198,4 @@ export class PollSync<T> {
180198

181199
return this.hpDocument;
182200
}
183-
184-
public async getAwarenessStates(): Promise<AwarenessState | undefined> {
185-
const hpDoc = this.getHpDocument();
186-
const { promise, done } = promiseDone<AwarenessState | undefined>();
187-
188-
const updateFn = (update: AwarenessUpdate) => {
189-
console.log('Awareness Update', update);
190-
191-
done(
192-
hpDoc.hasAwarenessStates()
193-
? Object.fromEntries(hpDoc.awareness.getStates())
194-
: undefined,
195-
);
196-
197-
hpDoc.awareness.off('update', updateFn);
198-
};
199-
200-
hpDoc.awareness.off('update', updateFn);
201-
hpDoc.awareness.on('update', updateFn);
202-
203-
return promise;
204-
}
205201
}

src/frontend/servers/y-provider/src/routes.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
export const routes = {
22
COLLABORATION_WS: '/collaboration/ws/',
3-
COLLABORATION_POLL_AWARENESS: '/collaboration/ws/poll/awareness/',
43
COLLABORATION_POLL_MESSAGE: '/collaboration/ws/poll/message/',
54
COLLABORATION_POLL_SYNC: '/collaboration/ws/poll/sync/',
65
COLLABORATION_RESET_CONNECTIONS: '/collaboration/api/reset-connections/',

src/frontend/servers/y-provider/src/servers/appServer.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import expressWebsockets from 'express-ws';
66

77
import { PORT } from '../env';
88
import {
9-
collaborationPollGetAwarenessHandler,
109
collaborationPollPostMessageHandler,
1110
collaborationPollSSEMessageHandler,
1211
collaborationPollSyncDocHandler,
@@ -34,16 +33,6 @@ export const initServer = () => {
3433
* Routes to handle collaboration connections
3534
*/
3635
app.ws(routes.COLLABORATION_WS, wsSecurity, collaborationWSHandler);
37-
app.get(
38-
routes.COLLABORATION_POLL_AWARENESS,
39-
httpSecurity,
40-
collaborationPollGetAwarenessHandler,
41-
);
42-
// app.get(
43-
// routes.COLLABORATION_POLL_DOC,
44-
// httpSecurity,
45-
// collaborationPollGetDocHandler,
46-
// );
4736
app.get(
4837
routes.COLLABORATION_POLL_MESSAGE,
4938
httpSecurity,

0 commit comments

Comments
 (0)