Skip to content
1 change: 1 addition & 0 deletions examples/lua-multi-incr.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const client = createClient({
scripts: {
mincr: defineScript({
NUMBER_OF_KEYS: 2,
// TODO add RequestPolicy: ,
SCRIPT:
'return {' +
'redis.pcall("INCRBY", KEYS[1], ARGV[1]),' +
Expand Down
8 changes: 8 additions & 0 deletions packages/client/lib/client/parser.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { RedisArgument } from '../RESP/types';
import { RedisVariadicArgument } from '../commands/generic-transformers';

export type CommandIdentifier = { command: string, subcommand: string };
export interface CommandParser {
redisArgs: ReadonlyArray<RedisArgument>;
keys: ReadonlyArray<RedisArgument>;
firstKey: RedisArgument | undefined;
preserve: unknown;
commandIdentifier: CommandIdentifier;

push: (...arg: Array<RedisArgument>) => unknown;
pushVariadic: (vals: RedisVariadicArgument) => unknown;
Expand Down Expand Up @@ -44,6 +46,12 @@ export class BasicCommandParser implements CommandParser {
return tmp.join('_');
}

get commandIdentifier(): CommandIdentifier {
const command = this.#redisArgs[0] instanceof Buffer ? this.#redisArgs[0].toString() : this.#redisArgs[0];
const subcommand = this.#redisArgs[1] instanceof Buffer ? this.#redisArgs[1].toString() : this.#redisArgs[1];
return { command, subcommand };
}

push(...arg: Array<RedisArgument>) {
this.#redisArgs.push(...arg);
};
Expand Down
14 changes: 14 additions & 0 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,20 @@ export default class RedisClusterSlots<
await Promise.allSettled(promises);
}

getAllClients() {
return Array.from(this.#clients());
}

getAllMasterClients() {
const result = [];
for (const master of this.masters) {
if (master.client) {
result.push(master.client);
}
}
return result;
}

getClient(
firstKey: RedisArgument | undefined,
isReadonly: boolean | undefined
Expand Down
191 changes: 152 additions & 39 deletions packages/client/lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import { PubSubListener } from '../client/pub-sub';
import { ErrorReply } from '../errors';
import { RedisTcpSocketOptions } from '../client/socket';
import { ClientSideCacheConfig, PooledClientSideCacheProvider } from '../client/cache';
import { BasicCommandParser } from '../client/parser';
import { BasicCommandParser, CommandParser } from '../client/parser';
import { ASKING_CMD } from '../commands/ASKING';
import SingleEntryCache from '../single-entry-cache'
import { POLICIES, PolicyResolver, REQUEST_POLICIES_WITH_DEFAULTS, RESPONSE_POLICIES_WITH_DEFAULTS, StaticPolicyResolver } from './request-response-policies';
import { aggregateLogicalAnd, aggregateLogicalOr, aggregateMax, aggregateMerge, aggregateMin, aggregateSum } from './request-response-policies/generic-aggregators';
interface ClusterCommander<
M extends RedisModules,
F extends RedisFunctions,
Expand Down Expand Up @@ -189,7 +191,7 @@ export default class RedisCluster<
command.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
command.IS_READ_ONLY,
this._commandOptions,
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
Expand All @@ -205,7 +207,7 @@ export default class RedisCluster<
command.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
command.IS_READ_ONLY,
this._self._commandOptions,
(client, opts) => client._executeCommand(command, parser, opts, transformReply)
Expand All @@ -223,7 +225,7 @@ export default class RedisCluster<
fn.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
fn.IS_READ_ONLY,
this._self._commandOptions,
(client, opts) => client._executeCommand(fn, parser, opts, transformReply)
Expand All @@ -241,7 +243,7 @@ export default class RedisCluster<
script.parseCommand(parser, ...args);

return this._self._execute(
parser.firstKey,
parser,
script.IS_READ_ONLY,
this._commandOptions,
(client, opts) => client._executeScript(script, parser, opts, transformReply)
Expand Down Expand Up @@ -299,6 +301,7 @@ export default class RedisCluster<

private _self = this;
private _commandOptions?: ClusterCommandOptions<TYPE_MAPPING/*, POLICIES*/>;
private _policyResolver: PolicyResolver;

/**
* An array of the cluster slots, each slot contain its `master` and `replicas`.
Expand Down Expand Up @@ -356,6 +359,8 @@ export default class RedisCluster<
if (options?.commandOptions) {
this._commandOptions = options.commandOptions;
}

this._policyResolver = new StaticPolicyResolver(POLICIES);
}

duplicate<
Expand Down Expand Up @@ -451,54 +456,157 @@ export default class RedisCluster<
}

async _execute<T>(
firstKey: RedisArgument | undefined,
parser: CommandParser,
isReadonly: boolean | undefined,
options: ClusterCommandOptions | undefined,
fn: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>, opts?: ClusterCommandOptions) => Promise<T>
): Promise<T> {

const maxCommandRedirections = this._options.maxCommandRedirections ?? 16;
let client = await this._slots.getClient(firstKey, isReadonly);
let i = 0;

let myFn = fn;
const policyResult = this._policyResolver.resolvePolicy(parser.commandIdentifier);

while (true) {
try {
return await myFn(client, options);
} catch (err) {
myFn = fn;
if(!policyResult.ok) {
throw new Error(`Policy resolution error for ${parser.commandIdentifier}: ${policyResult.error}`);
}

// TODO: error class
if (++i > maxCommandRedirections || !(err instanceof Error)) {
throw err;
}
const requestPolicy = policyResult.value.request
const responsePolicy = policyResult.value.response

let clients: Array<RedisClientType<M, F, S, RESP, TYPE_MAPPING>>;
// https://redis.io/docs/latest/develop/reference/command-tips
switch (requestPolicy) {

case REQUEST_POLICIES_WITH_DEFAULTS.ALL_NODES:
clients = this._slots.getAllClients()
break;

case REQUEST_POLICIES_WITH_DEFAULTS.ALL_SHARDS:
clients = this._slots.getAllMasterClients()
break;

case REQUEST_POLICIES_WITH_DEFAULTS.MULTI_SHARD:
clients = await Promise.all(
parser.keys.map((key) => this._slots.getClient(key, isReadonly))
);
break;

case REQUEST_POLICIES_WITH_DEFAULTS.SPECIAL:
throw new Error(`Special request policy not implemented for ${parser.commandIdentifier}`);

case REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS:
//TODO handle undefined case?
clients = [this._slots.getRandomNode().client!]
break;

case REQUEST_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED:
clients = [await this._slots.getClient(parser.firstKey, isReadonly)]
break;

default:
throw new Error(`Unknown request policy ${requestPolicy}`);

if (err.message.startsWith('ASK')) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
let redirectTo = await this._slots.getMasterByAddress(address);
if (!redirectTo) {
await this._slots.rediscover(client);
redirectTo = await this._slots.getMasterByAddress(address);
}

const responsePromises = clients.map(async client => {

let i = 0;

let myFn = fn;

while (true) {
try {
return await myFn(client, options);
} catch (err) {
myFn = fn;

// TODO: error class
if (++i > maxCommandRedirections || !(err instanceof Error)) {
throw err;
}

if (!redirectTo) {
throw new Error(`Cannot find node ${address}`);
if (err.message.startsWith('ASK')) {
const address = err.message.substring(err.message.lastIndexOf(' ') + 1);
let redirectTo = await this._slots.getMasterByAddress(address);
if (!redirectTo) {
await this._slots.rediscover(client);
redirectTo = await this._slots.getMasterByAddress(address);
}

if (!redirectTo) {
throw new Error(`Cannot find node ${address}`);
}

client = redirectTo;
myFn = this._handleAsk(fn);
continue;
}

if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client);
client = await this._slots.getClient(parser.firstKey, isReadonly);
continue;
}

client = redirectTo;
myFn = this._handleAsk(fn);
continue;
}

if (err.message.startsWith('MOVED')) {
await this._slots.rediscover(client);
client = await this._slots.getClient(firstKey, isReadonly);
continue;
}
throw err;
}
}

throw err;
}
})

switch (responsePolicy) {
case RESPONSE_POLICIES_WITH_DEFAULTS.ONE_SUCCEEDED: {
return Promise.any(responsePromises);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.ALL_SUCCEEDED: {
const responses = await Promise.all(responsePromises);
return responses[0]
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_LOGICAL_AND: {
const responses = await Promise.all(responsePromises)
return aggregateLogicalAnd(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_LOGICAL_OR: {
const responses = await Promise.all(responsePromises)
return aggregateLogicalOr(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_MIN: {
const responses = await Promise.all(responsePromises);
return aggregateMin(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_MAX: {
const responses = await Promise.all(responsePromises);
return aggregateMax(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.AGG_SUM: {
const responses = await Promise.all(responsePromises);
return aggregateSum(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.SPECIAL: {
throw new Error(`Special response policy not implemented for ${parser.commandIdentifier}`);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYLESS: {
const responses = await Promise.all(responsePromises);
return aggregateMerge(responses);
}

case RESPONSE_POLICIES_WITH_DEFAULTS.DEFAULT_KEYED: {
const responses = await Promise.all(responsePromises);
return responses as T;
}

default:
throw new Error(`Unknown response policy ${responsePolicy}`);
}

}

async sendCommand<T = ReplyUnion>(
Expand All @@ -508,8 +616,13 @@ export default class RedisCluster<
options?: ClusterCommandOptions,
// defaultPolicies?: CommandPolicies
): Promise<T> {

const parser = new BasicCommandParser();
firstKey && parser.push(firstKey)
args.forEach(arg => parser.push(arg));

return this._self._execute(
firstKey,
parser,
isReadonly,
options,
(client, opts) => client.sendCommand(args, opts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// import { RedisFunctions, RedisModules, RedisScripts, RespVersions, TypeMapping } from "../../RESP/types";
// import { ShardNode } from "../cluster-slots";
// import type { Either } from './types';

// export interface CommandRouter<
// M extends RedisModules,
// F extends RedisFunctions,
// S extends RedisScripts,
// RESP extends RespVersions,
// TYPE_MAPPING extends TypeMapping> {
// routeCommand(
// command: string,
// policy: RequestPolicy,
// ): Either<ShardNode<M, F, S, RESP, TYPE_MAPPING>, 'no-available-nodes' | 'routing-failed'>;
// }
Loading
Loading