Skip to content

Commit ccc4ed3

Browse files
authored
Merge pull request #360 from ml054/v5.2
RDBC-654 Investigate cloudflare and node.js client interoperability
2 parents 3a6ad3e + 4767e85 commit ccc4ed3

File tree

7 files changed

+184
-20
lines changed

7 files changed

+184
-20
lines changed

.github/workflows/RavenClient.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424

2525
strategy:
2626
matrix:
27-
node-version: [12.x, 14.x, 16.x, 18.x]
27+
node-version: [14.x, 16.x, 18.x]
2828
serverVersion: ["5.2", "5.4"]
2929
fail-fast: false
3030

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
"Hibernating Rhinos"
4242
],
4343
"engines": {
44-
"node": ">=6.0.0"
44+
"node": ">=14.0.0"
4545
},
4646
"keywords": [
4747
"ravendb",

src/Documents/Conventions/DocumentConventions.ts

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ export class DocumentConventions {
9393
private _remoteEntityFieldNameConvention: CasingConvention;
9494

9595
private _objectMapper: TypesAwareObjectMapper;
96+
private _customFetch: any;
9697
private _dateUtil: DateUtil;
9798
private _syncJsonParseLimit: number;
9899

@@ -109,7 +110,7 @@ export class DocumentConventions {
109110
this._readBalanceBehavior = "None";
110111
this._identityPartsSeparator = "/";
111112
this._identityProperty = CONSTANTS.Documents.Metadata.ID_PROPERTY;
112-
113+
113114
this._findJsType = (id: string, doc: object) => {
114115
const metadata = doc[CONSTANTS.Documents.Metadata.KEY];
115116
if (metadata) {
@@ -249,9 +250,28 @@ export class DocumentConventions {
249250
this._objectMapper = value;
250251
}
251252

253+
public get customFetch(): any {
254+
return this._customFetch;
255+
}
256+
257+
/**
258+
* Allows to override default fetch method
259+
*
260+
* This method is useful to enable RavenDB node.js client
261+
* on CloudFlare Workers
262+
*
263+
* You should pass object bound to worker with type: mtls_certificate
264+
*
265+
* @param customFetch
266+
*/
267+
public set customFetch(customFetch: any) {
268+
this._assertNotFrozen();
269+
this._customFetch = customFetch;
270+
}
271+
252272
/**
253273
* Sets json length limit for sync parsing. Beyond that size
254-
* we fallback to async parsing
274+
* we fall back to async parsing
255275
*/
256276
public get syncJsonParseLimit(): number {
257277
return this._syncJsonParseLimit;
@@ -435,7 +455,7 @@ export class DocumentConventions {
435455
public set storeDatesWithTimezoneInfo(value) {
436456
this._assertNotFrozen();
437457
this._dateUtilOpts.withTimezone = true;
438-
}
458+
}
439459

440460
/**
441461
* If set to 'true' then it will throw an exception when any query is performed (in session)

src/Http/RavenCommand.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import { JsonSerializer } from "../Mapping/Json/Serializer";
1313
import { RavenCommandResponsePipeline } from "./RavenCommandResponsePipeline";
1414
import { DocumentConventions } from "../Documents/Conventions/DocumentConventions";
1515
import * as http from "http";
16-
import { ObjectTypeDescriptor, ServerResponse } from "../Types";
16+
import { ObjectTypeDescriptor } from "../Types";
17+
import { ReadableWebToNodeStream } from "../Utility/ReadableWebToNodeStream";
18+
import { LengthUnawareFormData } from "../Utility/LengthUnawareFormData";
1719

1820
const log = getLogger({ module: "RavenCommand" });
1921

@@ -120,19 +122,27 @@ export abstract class RavenCommand<TResult> {
120122

121123
public async send(agent: http.Agent,
122124
requestOptions: HttpRequestParameters): Promise<{ response: HttpResponse, bodyStream: stream.Readable }> {
123-
const { body, uri, ...restOptions } = requestOptions;
125+
126+
const { body, uri, fetcher, ...restOptions } = requestOptions;
127+
124128
log.info(`Send command ${this.constructor.name} to ${uri}${body ? " with body " + body : ""}.`);
125129

126130
if (requestOptions.agent) { // support for fiddler
127131
agent = requestOptions.agent as http.Agent;
128132
}
129133

130-
const optionsToUse = { body, ...restOptions, agent } as RequestInit;
134+
const bodyToUse = fetcher ? RavenCommand.maybeWrapBody(body) : body;
135+
136+
const optionsToUse = { body: bodyToUse, ...restOptions, agent } as RequestInit;
131137

132138
const passthrough = new stream.PassThrough();
133-
const response = await fetch(uri, optionsToUse);
134139
passthrough.pause();
135-
response.body
140+
141+
const fetchFn = fetcher ?? fetch; // support for custom fetcher
142+
const response = await fetchFn(uri, optionsToUse);
143+
144+
const effectiveStream: stream.Readable = fetcher ? new ReadableWebToNodeStream(response.body) : response.body;
145+
effectiveStream
136146
.pipe(passthrough);
137147

138148
return {
@@ -141,6 +151,17 @@ export abstract class RavenCommand<TResult> {
141151
};
142152
}
143153

154+
private static maybeWrapBody(body: any) {
155+
if (body instanceof LengthUnawareFormData) {
156+
throw new Error("Requests using FormData as payload are not yet supported!");
157+
}
158+
if (body instanceof stream.Readable) {
159+
throw new Error("Requests using stream.Readable as payload are not yet supported!");
160+
}
161+
162+
return body;
163+
}
164+
144165
public setResponseRaw(response: HttpResponse, body: string): void {
145166
throwError("NotSupportedException",
146167
"When _responseType is set to RAW then please override this method to handle the response.");

src/Http/RequestExecutor.ts

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,12 @@ import { validateUri } from "../Utility/UriUtil";
3333
import * as StreamUtil from "../Utility/StreamUtil";
3434
import { closeHttpResponse } from "../Utility/HttpUtil";
3535
import { PromiseStatusTracker } from "../Utility/PromiseUtil";
36-
import * as http from "http";
37-
import * as https from "https";
36+
import type * as http from "http";
37+
import type * as https from "https";
3838
import { IBroadcast } from "./IBroadcast";
3939
import { StringUtil } from "../Utility/StringUtil";
4040
import { IRaftCommand } from "./IRaftCommand";
4141
import AbortController from "abort-controller";
42-
import { URL } from "url";
4342
import { EventEmitter } from "events";
4443
import {
4544
BeforeRequestEventArgs,
@@ -142,7 +141,11 @@ export class NodeStatus implements IDisposable {
142141
export class RequestExecutor implements IDisposable {
143142
private _emitter = new EventEmitter();
144143

145-
private static readonly GLOBAL_APPLICATION_IDENTIFIER = uuidv4().toString();
144+
/*
145+
we don't initialize this here due to issue with cloudflare
146+
see: https://github.com/cloudflare/miniflare/issues/292
147+
*/
148+
private static GLOBAL_APPLICATION_IDENTIFIER: string = null;
146149

147150
private static readonly INITIAL_TOPOLOGY_ETAG = -2;
148151

@@ -183,9 +186,11 @@ export class RequestExecutor implements IDisposable {
183186

184187
private _httpAgent: http.Agent;
185188

186-
private static readonly KEEP_ALIVE_HTTP_AGENT = new http.Agent({
187-
keepAlive: true
188-
});
189+
/*
190+
we don't initialize this here due to issue with cloudflare
191+
see: https://github.com/cloudflare/miniflare/issues/292
192+
*/
193+
private static KEEP_ALIVE_HTTP_AGENT: http.Agent = null;
189194

190195
private static readonly HTTPS_AGENT_CACHE = new Map<string, https.Agent>();
191196

@@ -330,6 +335,10 @@ export class RequestExecutor implements IDisposable {
330335
}
331336

332337
public getHttpAgent(): http.Agent {
338+
if (this.conventions.customFetch) {
339+
return null;
340+
}
341+
333342
if (this._httpAgent) {
334343
return this._httpAgent;
335344
}
@@ -344,6 +353,9 @@ export class RequestExecutor implements IDisposable {
344353
if (RequestExecutor.HTTPS_AGENT_CACHE.has(cacheKey)) {
345354
return RequestExecutor.HTTPS_AGENT_CACHE.get(cacheKey);
346355
} else {
356+
// eslint-disable-next-line @typescript-eslint/no-var-requires
357+
const https = require("https");
358+
347359
const agent = new https.Agent({
348360
keepAlive: true,
349361
...agentOptions
@@ -353,10 +365,22 @@ export class RequestExecutor implements IDisposable {
353365
return agent;
354366
}
355367
} else {
368+
RequestExecutor.assertKeepAliveAgent();
356369
return RequestExecutor.KEEP_ALIVE_HTTP_AGENT;
357370
}
358371
}
359372

373+
private static assertKeepAliveAgent() {
374+
if (!RequestExecutor.KEEP_ALIVE_HTTP_AGENT) {
375+
// eslint-disable-next-line @typescript-eslint/no-var-requires
376+
const http = require("http");
377+
378+
RequestExecutor.KEEP_ALIVE_HTTP_AGENT = new http.Agent({
379+
keepAlive: true
380+
});
381+
}
382+
}
383+
360384
public getTopologyNodes(): ServerNode[] {
361385
const topology = this.getTopology();
362386
return topology
@@ -399,14 +423,25 @@ export class RequestExecutor implements IDisposable {
399423
opts?: IRequestExecutorOptions): RequestExecutor {
400424
const { authOptions, documentConventions } = opts || {} as IRequestExecutorOptions;
401425
const executor = new RequestExecutor(database, authOptions, documentConventions);
402-
executor._firstTopologyUpdatePromise = executor._firstTopologyUpdate(initialUrls, this.GLOBAL_APPLICATION_IDENTIFIER);
426+
427+
executor._firstTopologyUpdatePromise = executor._firstTopologyUpdate(initialUrls, RequestExecutor.getGlobalApplicationIdentifier());
403428

404429
// this is just to get rid of unhandled rejection, we're handling it later on
405430
executor._firstTopologyUpdatePromise.catch(TypeUtil.NOOP);
406431

407432
return executor;
408433
}
409434

435+
private static getGlobalApplicationIdentifier() {
436+
// due to cloudflare constraints we can't init GLOBAL_APPLICATION_IDENTIFIER in static
437+
438+
if (!this.GLOBAL_APPLICATION_IDENTIFIER) {
439+
this.GLOBAL_APPLICATION_IDENTIFIER = uuidv4();
440+
}
441+
442+
return this.GLOBAL_APPLICATION_IDENTIFIER;
443+
}
444+
410445
public static createForSingleNodeWithConfigurationUpdates(
411446
url: string, database: string, opts: IRequestExecutorOptions): RequestExecutor {
412447
const executor =
@@ -1284,6 +1319,10 @@ export class RequestExecutor implements IDisposable {
12841319
return null;
12851320
}
12861321

1322+
if (this.conventions.customFetch) {
1323+
request.fetcher = this.conventions.customFetch;
1324+
}
1325+
12871326
const req = Object.assign(request, this._defaultRequestOptions);
12881327
urlRef(req.uri);
12891328
req.headers = req.headers || {};

src/Primitives/Http.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { Request, RequestInit, Response } from "node-fetch";
22

3-
export type HttpRequestParameters = RequestInit & { uri: string };
4-
export type HttpRequestParametersWithoutUri = RequestInit;
3+
export type HttpRequestParameters = RequestInit & {
4+
uri: string;
5+
fetcher?: any;
6+
};
7+
export type HttpRequestParametersWithoutUri = RequestInit & {
8+
fetcher?: any;
9+
};
510
export type HttpResponse = Response;
611
export type HttpRequest = Request;
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Taken from: https://github.com/Borewit/readable-web-to-node-stream/blob/master/lib/index.ts
2+
3+
import { Readable } from 'readable-stream';
4+
5+
/**
6+
* Converts a Web-API stream into Node stream.Readable class
7+
* Node stream readable: https://nodejs.org/api/stream.html#stream_readable_streams
8+
* Web API readable-stream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
9+
* Node readable stream: https://nodejs.org/api/stream.html#stream_readable_streams
10+
*/
11+
export class ReadableWebToNodeStream extends Readable {
12+
13+
public bytesRead: number = 0;
14+
public released = false;
15+
16+
/**
17+
* Default web API stream reader
18+
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
19+
*/
20+
private reader: any;
21+
private pendingRead: Promise<any> | undefined;
22+
23+
/**
24+
*
25+
* @param stream ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
26+
*/
27+
constructor(stream: any) {
28+
super();
29+
this.reader = stream.getReader();
30+
}
31+
32+
/**
33+
* Implementation of readable._read(size).
34+
* When readable._read() is called, if data is available from the resource,
35+
* the implementation should begin pushing that data into the read queue
36+
* https://nodejs.org/api/stream.html#stream_readable_read_size_1
37+
*/
38+
public async _read() {
39+
// Should start pushing data into the queue
40+
// Read data from the underlying Web-API-readable-stream
41+
if (this.released) {
42+
this.push(null); // Signal EOF
43+
return;
44+
}
45+
this.pendingRead = this.reader.read();
46+
const data = await this.pendingRead;
47+
// clear the promise before pushing new data to the queue and allow sequential calls to _read()
48+
delete this.pendingRead;
49+
if (data.done || this.released) {
50+
this.push(null); // Signal EOF
51+
} else {
52+
this.bytesRead += data.value.length;
53+
this.push(data.value); // Push new data to the queue
54+
}
55+
}
56+
57+
/**
58+
* If there is no unresolved read call to Web-API Readable Stream immediately returns;
59+
* otherwise will wait until the read is resolved.
60+
*/
61+
public async waitForReadToComplete() {
62+
if (this.pendingRead) {
63+
await this.pendingRead;
64+
}
65+
}
66+
67+
/**
68+
* Close wrapper
69+
*/
70+
public async close(): Promise<void> {
71+
await this.syncAndRelease();
72+
}
73+
74+
private async syncAndRelease() {
75+
this.released = true;
76+
await this.waitForReadToComplete();
77+
await this.reader.releaseLock();
78+
}
79+
}

0 commit comments

Comments
 (0)