Skip to content

Commit

Permalink
hiccups
Browse files Browse the repository at this point in the history
  • Loading branch information
louis-bompart committed Feb 11, 2025
1 parent 3cf5292 commit 3450a7f
Show file tree
Hide file tree
Showing 5 changed files with 687 additions and 619 deletions.
268 changes: 141 additions & 127 deletions packages/headless/src/utils/fetch-event-source/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,65 @@
import { EventSourceMessage, getBytes, getLines, getMessages } from './parse.js';
import {EventSourceMessage, getBytes, getLines, getMessages} from './parse.js';

export const EventStreamContentType = 'text/event-stream';

const DefaultRetryInterval = 1000;
const LastEventId = 'last-event-id';

function isBrowser() {
return typeof window !== 'undefined'
};
return typeof window !== 'undefined';
}

export interface FetchEventSourceInit extends RequestInit {
/**
* The request headers. FetchEventSource only supports the Record<string,string> format.
*/
headers?: Record<string, string>,

/**
* Called when a response is received. Use this to validate that the response
* actually matches what you expect (and throw if it doesn't.) If not provided,
* will default to a basic validation to ensure the content-type is text/event-stream.
*/
onopen?: (response: Response) => Promise<void>,

/**
* Called when a message is received. NOTE: Unlike the default browser
* EventSource.onmessage, this callback is called for _all_ events,
* even ones with a custom `event` field.
*/
onmessage?: (ev: EventSourceMessage) => void;

/**
* Called when a response finishes. If you don't expect the server to kill
* the connection, you can throw an exception here and retry using onerror.
*/
onclose?: () => void;

/**
* Called when there is any error making the request / processing messages /
* handling callbacks etc. Use this to control the retry strategy: if the
* error is fatal, rethrow the error inside the callback to stop the entire
* operation. Otherwise, you can return an interval (in milliseconds) after
* which the request will automatically retry (with the last-event-id).
* If this callback is not specified, or it returns undefined, fetchEventSource
* will treat every error as retriable and will try again after 1 second.
*/
onerror?: (err: any) => number | null | undefined | void,

/**
* If true, will keep the request open even if the document is hidden.
* By default, fetchEventSource will close the request and reopen it
* automatically when the document becomes visible again.
*/
openWhenHidden?: boolean;

/** The Fetch function to use. Defaults to window.fetch */
fetch?: typeof fetch;
/**
* The request headers. FetchEventSource only supports the Record<string,string> format.
*/
headers?: Record<string, string>;

/**
* Called when a response is received. Use this to validate that the response
* actually matches what you expect (and throw if it doesn't.) If not provided,
* will default to a basic validation to ensure the content-type is text/event-stream.
*/
onopen?: (response: Response) => Promise<void>;

/**
* Called when a message is received. NOTE: Unlike the default browser
* EventSource.onmessage, this callback is called for _all_ events,
* even ones with a custom `event` field.
*/
onmessage?: (ev: EventSourceMessage) => void;

/**
* Called when a response finishes. If you don't expect the server to kill
* the connection, you can throw an exception here and retry using onerror.
*/
onclose?: () => void;

/**
* Called when there is any error making the request / processing messages /
* handling callbacks etc. Use this to control the retry strategy: if the
* error is fatal, rethrow the error inside the callback to stop the entire
* operation. Otherwise, you can return an interval (in milliseconds) after
* which the request will automatically retry (with the last-event-id).
* If this callback is not specified, or it returns undefined, fetchEventSource
* will treat every error as retryable and will try again after 1 second.
*/
onerror?: (err: unknown) => number | null | undefined;

/**
* If true, will keep the request open even if the document is hidden.
* By default, fetchEventSource will close the request and reopen it
* automatically when the document becomes visible again.
*/
openWhenHidden?: boolean;

/** The Fetch function to use. Defaults to window.fetch */
fetch?: typeof fetch;
}

export function fetchEventSource(input: RequestInfo, {
export function fetchEventSource(
input: RequestInfo,
{
signal: inputSignal,
headers: inputHeaders,
onopen: inputOnOpen,
Expand All @@ -67,94 +69,106 @@ export function fetchEventSource(input: RequestInfo, {
openWhenHidden,
fetch: inputFetch,
...rest
}: FetchEventSourceInit) {
return new Promise<void>((resolve, reject) => {
// make a copy of the input headers since we may modify it below:
const headers = { ...inputHeaders };
if (!headers.accept) {
headers.accept = EventStreamContentType;
}
}: FetchEventSourceInit
) {
return new Promise<void>((resolve, reject) => {
// make a copy of the input headers since we may modify it below:
const headers = {...inputHeaders};
if (!headers.accept) {
headers.accept = EventStreamContentType;
}

let curRequestController: AbortController | null;
function onVisibilityChange() {
curRequestController?.abort(); // close existing request on every visibility change
if (!document.hidden) {
create(); // page is now visible again, recreate request.
}
}
let curRequestController: AbortController | null;
function onVisibilityChange() {
curRequestController?.abort(); // close existing request on every visibility change
if (!document.hidden) {
create(); // page is now visible again, recreate request.
}
}

if (!openWhenHidden && isBrowser()) {
document.addEventListener('visibilitychange', onVisibilityChange);
}
if (!openWhenHidden && isBrowser()) {
document.addEventListener('visibilitychange', onVisibilityChange);
}

let retryInterval = DefaultRetryInterval;
let retryTimer;
function dispose() {
if(isBrowser()) {
document.removeEventListener('visibilitychange', onVisibilityChange);
}
clearTimeout(retryTimer);
curRequestController?.abort();
}
let retryInterval = DefaultRetryInterval;
let retryTimer: string | number | NodeJS.Timeout;
function dispose() {
if (isBrowser()) {
document.removeEventListener('visibilitychange', onVisibilityChange);
}
clearTimeout(retryTimer);
curRequestController?.abort();
}

// if the incoming signal aborts, dispose resources and resolve:
inputSignal?.addEventListener('abort', () => {
dispose();
resolve(); // don't waste time windconstructing/logging errors
// if the incoming signal aborts, dispose resources and resolve:
inputSignal?.addEventListener('abort', () => {
dispose();
resolve(); // don't waste time windconstructing/logging errors
});

const outputFetch = inputFetch ?? fetch;
const onopen = inputOnOpen ?? defaultOnOpen;
async function create(): Promise<void> {
curRequestController = AbortController ? new AbortController() : null;
try {
const response = await outputFetch(input, {
...rest,
headers,
signal: curRequestController?.signal,
});

const outputFetch = inputFetch ?? fetch;
const onopen = inputOnOpen ?? defaultOnOpen;
async function create() {
curRequestController = AbortController ? new AbortController() : null;
try {
const response = await outputFetch(input, {
...rest,
headers,
signal: curRequestController?.signal,
});

await onopen(response);

await getBytes(response.body!, getLines(getMessages(id => {
if (id) {
// store the id and send it back on the next retry:
headers[LastEventId] = id;
} else {
// don't send the last-event-id header anymore:
delete headers[LastEventId];
}
}, retry => {
retryInterval = retry;
}, onmessage)));

onclose?.();
dispose();
resolve();
} catch (err) {
if (!curRequestController?.signal?.aborted) {
// if we haven't aborted the request ourselves:
try {
// check if we need to retry:
const interval: any = onerror?.(err) ?? retryInterval;
clearTimeout(retryTimer);
retryTimer = setTimeout(create, interval);
} catch (innerErr) {
// we should not retry anymore:
dispose();
reject(innerErr);
}
await onopen(response);

await getBytes(
response.body!,
getLines(
getMessages(
(id) => {
if (id) {
// store the id and send it back on the next retry:
headers[LastEventId] = id;
} else {
// don't send the last-event-id header anymore:
delete headers[LastEventId];
}
}
},
(retry) => {
retryInterval = retry;
},
onmessage
)
)
);

onclose?.();
dispose();
resolve();
} catch (err) {
if (!curRequestController?.signal?.aborted) {
// if we haven't aborted the request ourselves:
try {
// check if we need to retry:
const interval: number = onerror?.(err) ?? retryInterval;
clearTimeout(retryTimer);
retryTimer = setTimeout(create, interval);
} catch (innerErr) {
// we should not retry anymore:
dispose();
reject(innerErr);
}
}
}
}

create();
});
create();
});
}

function defaultOnOpen(response: Response) {
const contentType = response.headers.get('content-type');
if (!contentType?.startsWith(EventStreamContentType)) {
throw new Error(`Expected content-type to be ${EventStreamContentType}, Actual: ${contentType}`);
}
const contentType = response.headers.get('content-type');
if (!contentType?.startsWith(EventStreamContentType)) {
throw new Error(
`Expected content-type to be ${EventStreamContentType}, Actual: ${contentType}`
);
}
}
8 changes: 6 additions & 2 deletions packages/headless/src/utils/fetch-event-source/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
export { fetchEventSource, FetchEventSourceInit, EventStreamContentType } from './fetch';
export { EventSourceMessage } from './parse';
export {
fetchEventSource,
type FetchEventSourceInit,
EventStreamContentType,
} from './fetch.js';
export {type EventSourceMessage} from './parse.js';
Loading

0 comments on commit 3450a7f

Please sign in to comment.