Skip to content

Commit

Permalink
WIP: bump connect-redis from 7.1.1 to 8.1.1
Browse files Browse the repository at this point in the history
To work it needs at least:
* bump redis from 3.1.1 to 4.5.7
* bump redislock from 3.1.2 to 5.0.0-beta.2

Some major breaking changes appears in this two packages.

[Redis](https://www.npmjs.com/package/redis):

* All methods now return promises natively
    - promesifyAll is no longer needed
    - Async Postfix needs to be removed everywhere
* Types are now native
    - `RedisClientType` client is created onCreate()
* SUBSCRIBE methode needsa listener
* All Redis Commands need camelCase or CAPS
* hmset is deprecated and needs to be replace by hSet

[Redlock](https://www.npmjs.com/package/redlock):

* Before 5.0.0 it only support [ioredis](https://www.npmjs.com/package/ioredis) implementation. AFAIK 5.0.0
 implements support for [redis-node](https://www.npmjs.com/package/redis)

Build fails on this commit
Some type adaptations, and syntactic corrections still remains to be done.
  • Loading branch information
hexaltation committed Jan 27, 2025
1 parent 45dc0e5 commit 17cf0dc
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 257 deletions.
159 changes: 79 additions & 80 deletions app/gen-server/lib/DocWorkerMap.ts

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions app/server/devServerMain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import {updateDb} from 'app/server/lib/dbUtils';
import {FlexServer} from 'app/server/lib/FlexServer';
import log from 'app/server/lib/log';
import {MergedServer} from 'app/server/MergedServer';
import {promisifyAll} from 'bluebird';
import * as fse from 'fs-extra';
import * as path from 'path';
import {createClient, RedisClient} from 'redis';
import {createClient} from 'redis';

promisifyAll(RedisClient.prototype);

function getPort(envVarName: string, fallbackPort: number): number {
const val = process.env[envVarName];
Expand Down Expand Up @@ -61,7 +59,7 @@ export async function main() {
const {createInitialDb} = require('test/gen-server/seed');
await createInitialDb();
if (process.env.REDIS_URL) {
await createClient(process.env.REDIS_URL).flushdbAsync();
await createClient({url:process.env.REDIS_URL}).flushDb();
}
} else {
await updateDb();
Expand Down
10 changes: 5 additions & 5 deletions app/server/lib/AccessTokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { KeyedMutex } from 'app/common/KeyedMutex';
import { AccessTokenOptions } from 'app/plugin/GristAPI';
import { makeId } from 'app/server/lib/idUtils';
import * as jwt from 'jsonwebtoken';
import { RedisClient } from 'redis';
import { RedisClientType } from 'redis';

export const Deps = {
// Signed tokens expire after this length of time.
Expand Down Expand Up @@ -87,7 +87,7 @@ export class AccessTokens implements IAccessTokens {
// tokens must be honored. Cache is of a list of secrets. It is important to allow multiple
// secrets so we can change the secret we are signing with and still honor tokens signed with
// a previous secret.
constructor(cli: RedisClient|null, private _factor: number = 10) {
constructor(cli: RedisClientType|null, private _factor: number = 10) {
this._store = cli ? new RedisAccessTokenSignerStore(cli) : new InMemoryAccessTokenSignerStore();
this._dtMsec = Deps.TOKEN_TTL_MSECS;
this._reads = new MapWithTTL<string, string[]>(this._dtMsec * _factor * 0.5);
Expand Down Expand Up @@ -247,15 +247,15 @@ export class InMemoryAccessTokenSignerStore implements IAccessTokenSignerStore {
// Redis based implementation of IAccessTokenSignerStore, for multi process/instance
// Grist.
export class RedisAccessTokenSignerStore implements IAccessTokenSignerStore {
constructor(private _cli: RedisClient) { }
constructor(private _cli: RedisClientType) { }

public async getSigners(docId: string): Promise<string[]> {
const keys = await this._cli.getAsync(this._getKey(docId));
const keys = await this._cli.get(this._getKey(docId));
return keys?.split(',') || [];
}

public async setSigners(docId: string, secrets: string[], ttlMsec: number): Promise<void> {
await this._cli.setexAsync(this._getKey(docId), ttlMsec, secrets.join(','));
await this._cli.setEx(this._getKey(docId), ttlMsec, secrets.join(','));
}

public async close() {
Expand Down
19 changes: 10 additions & 9 deletions app/server/lib/ActiveDoc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ import {IMessage, MsgType} from 'grain-rpc';
import imageSize from 'image-size';
import * as moment from 'moment-timezone';
import fetch from 'node-fetch';
import {createClient, RedisClient} from 'redis';
import {createClient, RedisClientType} from 'redis';
import tmp from 'tmp';

import {ActionHistory} from './ActionHistory';
Expand Down Expand Up @@ -270,7 +270,7 @@ export class ActiveDoc extends EventEmitter {
private _attachmentFileManager: AttachmentFileManager;

// Client watching for 'product changed' event published by Billing to update usage
private _redisSubscriber?: RedisClient;
private _redisSubscriber?: RedisClientType;

// Timer for shutting down the ActiveDoc a bit after all clients are gone.
private _inactivityTimer = new InactivityTimer(() => {
Expand Down Expand Up @@ -339,19 +339,20 @@ export class ActiveDoc extends EventEmitter {
if (process.env.REDIS_URL && billingAccount) {
const prefix = getPubSubPrefix();
const channel = `${prefix}-billingAccount-${billingAccount.id}-product-changed`;
this._redisSubscriber = createClient(process.env.REDIS_URL);
this._redisSubscriber.subscribe(channel);
this._redisSubscriber.on("message", async () => {
const listener = async (message: string, chan: string) => {
this._log.debug(null, `Reload ActiveDoc: ${message}, ${chan}`);
// A product change has just happened in Billing.
// Reload the doc (causing connected clients to reload) to ensure everyone sees the effect of the change.
this._log.debug(null, 'reload after product change');
await this.reloadDoc();
});
this._redisSubscriber.on("error", async (error) => {
};
this._redisSubscriber = createClient({url:process.env.REDIS_URL});
this._redisSubscriber.SUBSCRIBE(channel, listener)
.catch(err=>{
this._log.error(
null,
`encountered error while subscribed to channel ${channel}`,
error
err
);
});
}
Expand Down Expand Up @@ -2108,7 +2109,7 @@ export class ActiveDoc extends EventEmitter {

this._triggers.shutdown();

this._redisSubscriber?.quitAsync()
this._redisSubscriber?.quit()
.catch(e => this._log.warn(docSession, "Failed to quit redis subscriber", e));

// Clear the MapWithTTL to remove all timers from the event loop.
Expand Down
4 changes: 2 additions & 2 deletions app/server/lib/DocApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1871,10 +1871,10 @@ export class DocWorkerApi {
const expiry = 2 * 24 * 60 * 60 / period.periodsPerDay;
multi.incr(key).expire(key, expiry);
}
multi.execAsync().then(result => {
multi.exec().then(result => {
for (let i = 0; i < keys.length; i++) {
const key = keys[i];
const newCount = Number(result![i * 2]); // incrs are at even positions, expires at odd positions
const newCount = Number(result[i * 2]); // incrs are at even positions, expires at odd positions
// Theoretically this could be overwritten by a lower count that was requested earlier
// but somehow arrived after.
// This doesn't really matter, and the count on redis will still increase reliably.
Expand Down
4 changes: 2 additions & 2 deletions app/server/lib/DocWorkerMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import { IChecksumStore } from 'app/server/lib/IChecksumStore';
import { IElectionStore } from 'app/server/lib/IElectionStore';
import { IPermitStores } from 'app/server/lib/Permit';
import { RedisClient } from 'redis';
import { RedisClientType } from 'redis';

export interface DocWorkerInfo {
id: string;
Expand Down Expand Up @@ -75,5 +75,5 @@ export interface IDocWorkerMap extends IPermitStores, IElectionStore, IChecksumS

removeDocGroup(docId: string): Promise<void>;

getRedisClient(): RedisClient|null;
getRedisClient(): RedisClientType|null;
}
4 changes: 2 additions & 2 deletions app/server/lib/FlexServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -532,14 +532,14 @@ export class FlexServer implements GristServer {
checks.set('db', asyncCheck(this._dbManager.connection.query('SELECT 1')));
}
if (isParameterOn(req.query.redis)) {
checks.set('redis', asyncCheck(this._docWorkerMap.getRedisClient()?.pingAsync()));
checks.set('redis', asyncCheck(this._docWorkerMap.getRedisClient()?.ping()));
}
if (isParameterOn(req.query.docWorkerRegistered) && this.worker) {
// Only check whether the doc worker is registered if we have a worker.
// The Redis client may not be connected, but in this case this has to
// be checked with the 'redis' parameter (the user may want to avoid
// removing workers when connection is unstable).
if (this._docWorkerMap.getRedisClient()?.connected) {
if (this._docWorkerMap.getRedisClient()?.isReady) {
checks.set('docWorkerRegistered', asyncCheck(
this._docWorkerMap.isWorkerRegistered(this.worker).then(isRegistered => {
if (!isRegistered) { throw new Error('doc worker not registered'); }
Expand Down
51 changes: 24 additions & 27 deletions app/server/lib/Triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import {proxyAgent} from 'app/server/lib/ProxyAgent';
import {matchesBaseDomain} from 'app/server/lib/requestUtils';
import {delayAbort} from 'app/server/lib/serverUtils';
import {LogSanitizer} from "app/server/utils/LogSanitizer";
import {promisifyAll} from 'bluebird';
import * as _ from 'lodash';
import {AbortController, AbortSignal} from 'node-abort-controller';
import fetch from 'node-fetch';
import {createClient, Multi, RedisClient} from 'redis';

promisifyAll(RedisClient.prototype);
import {createClient, RedisClientType} from 'redis';

// Only owners can manage triggers, but any user's activity can trigger them
// and the corresponding actions get the full values
Expand Down Expand Up @@ -126,7 +123,7 @@ export class DocTriggers {

// Client lazily initiated by _redisClient getter, since most documents don't have triggers
// and therefore don't need a redis connection.
private _redisClientField: RedisClient | undefined;
private _redisClientField: RedisClientType | undefined;

// Promise which resolves after we finish fetching the backup queue from redis on startup.
private _getRedisQueuePromise: Promise<void> | undefined;
Expand All @@ -142,7 +139,7 @@ export class DocTriggers {
if (redisUrl) {
// We create a transient client just for this purpose because it makes it easy
// to quit it afterwards and avoid keeping a client open for documents without triggers.
this._getRedisQueuePromise = this._getRedisQueue(createClient(redisUrl));
this._getRedisQueuePromise = this._getRedisQueue(createClient({url:redisUrl}));
}
this._stats = new WebhookStatistics(this._docId, _activeDoc, () => this._redisClient ?? null);
}
Expand All @@ -151,7 +148,7 @@ export class DocTriggers {
this._shuttingDown = true;
this._loopAbort?.abort();
if (!this._sending) {
void(this._redisClientField?.quitAsync());
void(this._redisClientField?.quit());
}
}

Expand Down Expand Up @@ -346,7 +343,7 @@ export class DocTriggers {
// NOTE: this is subject to a race condition, currently it is not possible, but any future modification probably
// will require some kind of locking over the queue (or a rewrite)
if (removed && this._redisClient) {
await this._redisClient.multi().del(this._redisQueueKey).execAsync();
await this._redisClient.multi().del(this._redisQueueKey).exec();
}
await this._stats.clear();
this._log("Webhook queue cleared", {numRemoved: removed});
Expand Down Expand Up @@ -375,9 +372,9 @@ export class DocTriggers {
// Re-add all the remaining events to the queue.
if (this._webHookEventQueue.length) {
const strings = this._webHookEventQueue.map(e => JSON.stringify(e));
multi.rpush(this._redisQueueKey, ...strings);
multi.rPush(this._redisQueueKey, strings);
}
await multi.execAsync();
await multi.exec();
}
await this._stats.clear();
this._log("Single webhook queue cleared", {numRemoved: removed, webhookId});
Expand Down Expand Up @@ -444,7 +441,7 @@ export class DocTriggers {
private async _pushToRedisQueue(events: WebHookEvent[]) {
const strings = events.map(e => JSON.stringify(e));
try {
await this._redisClient?.rpushAsync(this._redisQueueKey, ...strings);
await this._redisClient?.rPush(this._redisQueueKey, strings);
}
catch(e){
// It's very hard to test this with integration tests, because it requires a redis failure.
Expand All @@ -454,15 +451,15 @@ export class DocTriggers {
}
}

private async _getRedisQueue(redisClient: RedisClient) {
const strings = await redisClient.lrangeAsync(this._redisQueueKey, 0, -1);
private async _getRedisQueue(redisClient: RedisClientType) {
const strings = await redisClient.lRange(this._redisQueueKey, 0, -1);
if (strings.length) {
this._log("Webhook events found on redis queue", {numEvents: strings.length});
const events = strings.map(s => JSON.parse(s));
this._webHookEventQueue.unshift(...events);
this._startSendLoop();
}
await redisClient.quitAsync();
await redisClient.quit();
}

private _getRecordDeltas(tableDelta: TableDelta): RecordDeltas {
Expand Down Expand Up @@ -715,10 +712,9 @@ export class DocTriggers {

this._webHookEventQueue.splice(0, batch.length);

let multi: Multi | null = null;
if (this._redisClient) {
multi = this._redisClient.multi();
multi.ltrim(this._redisQueueKey, batch.length, -1);
const multi = this._redisClient?.multi() || null;
if (multi) {
multi.lTrim(this._redisQueueKey, batch.length, -1);
}

if (!success) {
Expand All @@ -729,8 +725,9 @@ export class DocTriggers {
this._webHookEventQueue.push(...batch);
if (multi) {
const strings = batch.map(e => JSON.stringify(e));
multi.rpush(this._redisQueueKey, ...strings);
multi.rPush(this._redisQueueKey, strings);
}

// We are postponed, so mark that.
await this._stats.logStatus(id, 'postponed');
} else {
Expand Down Expand Up @@ -764,12 +761,12 @@ export class DocTriggers {
}
}

await multi?.execAsync();
await multi?.exec();
}

this._log("Ended _sendLoop");

this._redisClient?.quitAsync().catch(e =>
this._redisClient?.quit().catch(e =>
// Catch error to prevent sendLoop being restarted
this._log("Error quitting redis: " + e, {level: 'warn'})
);
Expand All @@ -782,7 +779,7 @@ export class DocTriggers {
const redisUrl = process.env.REDIS_URL;
if (redisUrl) {
this._log("Creating redis client");
this._redisClientField = createClient(redisUrl);
this._redisClientField = createClient({url:redisUrl});
}
return this._redisClientField;
}
Expand Down Expand Up @@ -908,15 +905,15 @@ class PersistedStore<Keys> {
constructor(
docId: string,
private _activeDoc: ActiveDoc,
private _redisClientDep: () => RedisClient | null
private _redisClientDep: () => RedisClientType | null
) {
this._redisKey = `webhooks:${docId}:statistics`;
}

public async clear() {
this._statsCache.clear();
if (this._redisClient) {
await this._redisClient.delAsync(this._redisKey).catch(() => {});
await this._redisClient.del(this._redisKey).catch(() => {});
}
}

Expand All @@ -928,10 +925,10 @@ class PersistedStore<Keys> {
if (this._redisClient) {
const multi = this._redisClient.multi();
for (const [key, value] of keyValues) {
multi.hset(this._redisKey, `${id}:${key}`, value);
multi.hSet(this._redisKey, `${id}:${key}`, value);
multi.expire(this._redisKey, WEBHOOK_STATS_CACHE_TTL);
}
await multi.execAsync();
await multi.exec();
} else {
for (const [key, value] of keyValues) {
this._statsCache.set(`${id}:${key}`, value);
Expand All @@ -941,7 +938,7 @@ class PersistedStore<Keys> {

protected async get(id: string, keys: Keys[]): Promise<[Keys, string][]> {
if (this._redisClient) {
const values = (await this._redisClient.hgetallAsync(this._redisKey)) || {};
const values = (await this._redisClient.hGetAll(this._redisKey)) || {};
return keys.map(key => [key, values[`${id}:${key}`] || '']);
} else {
return keys.map(key => [key, this._statsCache.get(`${id}:${key}`) || '']);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
"qrcode": "1.5.0",
"randomcolor": "0.5.3",
"redis": "4.7.0",
"redlock": "3.1.2",
"redlock": "5.0.0-beta.2",
"saml2-js": "4.0.2",
"scimmy": "1.2.4",
"scimmy-routers": "1.2.2",
Expand Down
Loading

0 comments on commit 17cf0dc

Please sign in to comment.