Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1288,11 +1288,17 @@ int MR_ClusterInnerCommunicationMsg(RedisModuleCtx *ctx, RedisModuleString **arg
}

int MR_ClusterIsMySlot(size_t slot) {
if (RedisModule_ShardingGetSlotRange) {
int first_slot, last_slot;
RedisModule_ShardingGetSlotRange(&first_slot, &last_slot);
return first_slot <= slot && last_slot >= slot;
if (clusterCtx.isOss) {
if (RedisModule_ClusterCanAccessKeysInSlot != NULL)
return RedisModule_ClusterCanAccessKeysInSlot(slot);
} else {
if (RedisModule_ShardingGetSlotRange != NULL) {
int first_slot, last_slot;
RedisModule_ShardingGetSlotRange(&first_slot, &last_slot);
return first_slot <= slot && last_slot >= slot;
}
}
// Fallback.
return clusterCtx.minSlot <= slot && clusterCtx.maxSlot >= slot;
}

Expand Down
108 changes: 106 additions & 2 deletions src/redismodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,14 @@ This flag should not be used directly by the module.
#define REDISMODULE_NOTIFY_LOADED (1<<12) /* module only key space notification, indicate a key loaded from rdb */
#define REDISMODULE_NOTIFY_MODULE (1<<13) /* d, module key space notification */
#define REDISMODULE_NOTIFY_NEW (1<<14) /* n, new key notification */
#define REDISMODULE_NOTIFY_OVERWRITTEN (1<<15) /* o, key overwrite notification */
#define REDISMODULE_NOTIFY_TYPE_CHANGED (1<<16) /* c, key type changed notification */
#define REDISMODULE_NOTIFY_KEY_TRIMMED (1<<17) /* module only key space notification, indicates a key trimmed during slot migration */

/* Next notification flag, must be updated when adding new flags above!
This flag should not be used directly by the module.
* Use RedisModule_GetKeyspaceNotificationFlagsAll instead. */
#define _REDISMODULE_NOTIFY_NEXT (1<<15)
#define _REDISMODULE_NOTIFY_NEXT (1<<18)

#define REDISMODULE_NOTIFY_ALL (REDISMODULE_NOTIFY_GENERIC | REDISMODULE_NOTIFY_STRING | REDISMODULE_NOTIFY_LIST | REDISMODULE_NOTIFY_SET | REDISMODULE_NOTIFY_HASH | REDISMODULE_NOTIFY_ZSET | REDISMODULE_NOTIFY_EXPIRED | REDISMODULE_NOTIFY_EVICTED | REDISMODULE_NOTIFY_STREAM | REDISMODULE_NOTIFY_MODULE) /* A */

Expand Down Expand Up @@ -510,7 +513,9 @@ typedef void (*RedisModuleEventLoopOneShotFunc)(void *user_data);
#define REDISMODULE_EVENT_EVENTLOOP 15
#define REDISMODULE_EVENT_CONFIG 16
#define REDISMODULE_EVENT_KEY 17
#define _REDISMODULE_EVENT_NEXT 18 /* Next event flag, should be updated if a new event added. */
#define REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION 18
#define REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM 19
#define _REDISMODULE_EVENT_NEXT 20 /* Next event flag, should be updated if a new event added. */

typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
Expand Down Expand Up @@ -621,6 +626,14 @@ static const RedisModuleEvent
RedisModuleEvent_Key = {
REDISMODULE_EVENT_KEY,
1
},
RedisModuleEvent_ClusterSlotMigration = {
REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION,
1
},
RedisModuleEvent_ClusterSlotMigrationTrim = {
REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM,
1
};

/* Those are values that are used for the 'subevent' callback argument. */
Expand Down Expand Up @@ -699,6 +712,20 @@ static const RedisModuleEvent
#define _REDISMODULE_SUBEVENT_CRON_LOOP_NEXT 0
#define _REDISMODULE_SUBEVENT_SWAPDB_NEXT 0

#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED 0
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED 1
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED 2
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED 3
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED 4
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED 5
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE 6
#define _REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_NEXT 7

#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED 0
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED 1
#define REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND 2
#define _REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_NEXT 3

/* RedisModuleClientInfo flags. */
#define REDISMODULE_CLIENTINFO_FLAG_SSL (1<<0)
#define REDISMODULE_CLIENTINFO_FLAG_PUBSUB (1<<1)
Expand Down Expand Up @@ -828,13 +855,55 @@ typedef struct RedisModuleKeyInfo {

#define RedisModuleKeyInfo RedisModuleKeyInfoV1

typedef struct RedisModuleSlotRange {
uint16_t start;
uint16_t end;
} RedisModuleSlotRange;

typedef struct RedisModuleSlotRangeArray {
int32_t num_ranges;
RedisModuleSlotRange ranges[];
} RedisModuleSlotRangeArray;

#define REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION 1

typedef struct RedisModuleClusterSlotMigrationInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
char source_node_id[REDISMODULE_NODE_ID_LEN + 1];
char destination_node_id[REDISMODULE_NODE_ID_LEN + 1];
const char *task_id;
RedisModuleSlotRangeArray *slots;
} RedisModuleClusterSlotMigrationInfoV1;

#define RedisModuleClusterSlotMigrationInfo RedisModuleClusterSlotMigrationInfoV1

#define REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION 1

typedef struct RedisModuleClusterSlotMigrationTrimInfo {
uint64_t version; /* Not used since this structure is never passed
from the module to the core right now. Here
for future compatibility. */
RedisModuleSlotRangeArray *slots;
} RedisModuleClusterSlotMigrationTrimInfoV1;

#define RedisModuleClusterSlotMigrationTrimInfo RedisModuleClusterSlotMigrationTrimInfoV1

typedef enum {
REDISMODULE_ACL_LOG_AUTH = 0, /* Authentication failure */
REDISMODULE_ACL_LOG_CMD, /* Command authorization failure */
REDISMODULE_ACL_LOG_KEY, /* Key authorization failure */
REDISMODULE_ACL_LOG_CHANNEL /* Channel authorization failure */
} RedisModuleACLLogEntryReason;

typedef enum {
REDISMODULE_CONFIG_TYPE_STRING = 0,
REDISMODULE_CONFIG_TYPE_ENUM,
REDISMODULE_CONFIG_TYPE_NUMERIC,
REDISMODULE_CONFIG_TYPE_BOOL,
} RedisModuleConfigType;

/* Incomplete structures needed by both the core and modules. */
typedef struct RedisModuleIO RedisModuleIO;
typedef struct RedisModuleDigest RedisModuleDigest;
Expand Down Expand Up @@ -896,6 +965,7 @@ typedef struct RedisModuleScanCursor RedisModuleScanCursor;
typedef struct RedisModuleUser RedisModuleUser;
typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
typedef struct RedisModuleRdbStream RedisModuleRdbStream;
typedef struct RedisModuleConfigIterator RedisModuleConfigIterator;

typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
Expand Down Expand Up @@ -1254,6 +1324,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R
REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_UnsubscribeFromKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR;
Expand All @@ -1274,6 +1345,10 @@ REDISMODULE_API void (*RedisModule_SetDisconnectCallback)(RedisModuleBlockedClie
REDISMODULE_API void (*RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags) REDISMODULE_ATTR;
REDISMODULE_API unsigned int (*RedisModule_ClusterKeySlot)(RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API const char *(*RedisModule_ClusterCanonicalKeyNameInSlot)(unsigned int slot) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ClusterCanAccessKeysInSlot)(int slot) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ClusterPropagateForSlotMigration)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleSlotRangeArray *(*RedisModule_ClusterGetLocalSlotRanges)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ClusterFreeSlotRanges)(RedisModuleCtx *ctx, RedisModuleSlotRangeArray *slots) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCommandFilter * (*RedisModule_RegisterCommandFilter)(RedisModuleCtx *ctx, RedisModuleCommandFilterFunc cb, int flags) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1342,6 +1417,18 @@ REDISMODULE_API void (*RedisModule_RdbStreamFree)(RedisModuleRdbStream *stream)
REDISMODULE_API int (*RedisModule_RdbLoad)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RdbSave)(RedisModuleCtx *ctx, RedisModuleRdbStream *stream, int flags) REDISMODULE_ATTR;
REDISMODULE_API const char * (*RedisModule_GetInternalSecret)(RedisModuleCtx *ctx, size_t *len) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleConfigIterator * (*RedisModule_ConfigIteratorCreate)(RedisModuleCtx *ctx, const char *pattern) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ConfigIteratorRelease)(RedisModuleCtx *ctx, RedisModuleConfigIterator *iter) REDISMODULE_ATTR;
REDISMODULE_API const char * (*RedisModule_ConfigIteratorNext)(RedisModuleConfigIterator *iter) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigGetType)(const char *name, RedisModuleConfigType *res) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigGet)(RedisModuleCtx *ctx, const char *name, RedisModuleString **res) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigGetBool)(RedisModuleCtx *ctx, const char *name, int *res) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigGetEnum)(RedisModuleCtx *ctx, const char *name, RedisModuleString **res) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigGetNumeric)(RedisModuleCtx *ctx, const char *name, long long *res) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigSet)(RedisModuleCtx *ctx, const char *name, RedisModuleString *value, RedisModuleString **err) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigSetBool)(RedisModuleCtx *ctx, const char *name, int value, RedisModuleString **err) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigSetEnum)(RedisModuleCtx *ctx, const char *name, RedisModuleString *value, RedisModuleString **err) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ConfigSetNumeric)(RedisModuleCtx *ctx, const char *name, long long value, RedisModuleString **err) REDISMODULE_ATTR;

#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)

Expand Down Expand Up @@ -1634,6 +1721,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(UnsubscribeFromKeyspaceEvents);
REDISMODULE_GET_API(AddPostNotificationJob);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
REDISMODULE_GET_API(GetNotifyKeyspaceEvents);
Expand All @@ -1653,6 +1741,10 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(SetClusterFlags);
REDISMODULE_GET_API(ClusterKeySlot);
REDISMODULE_GET_API(ClusterCanonicalKeyNameInSlot);
REDISMODULE_GET_API(ClusterCanAccessKeysInSlot);
REDISMODULE_GET_API(ClusterPropagateForSlotMigration);
REDISMODULE_GET_API(ClusterGetLocalSlotRanges);
REDISMODULE_GET_API(ClusterFreeSlotRanges);
REDISMODULE_GET_API(ExportSharedAPI);
REDISMODULE_GET_API(GetSharedAPI);
REDISMODULE_GET_API(RegisterCommandFilter);
Expand Down Expand Up @@ -1721,6 +1813,18 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(RdbLoad);
REDISMODULE_GET_API(RdbSave);
REDISMODULE_GET_API(GetInternalSecret);
REDISMODULE_GET_API(ConfigIteratorCreate);
REDISMODULE_GET_API(ConfigIteratorRelease);
REDISMODULE_GET_API(ConfigIteratorNext);
REDISMODULE_GET_API(ConfigGetType);
REDISMODULE_GET_API(ConfigGet);
REDISMODULE_GET_API(ConfigGetBool);
REDISMODULE_GET_API(ConfigGetEnum);
REDISMODULE_GET_API(ConfigGetNumeric);
REDISMODULE_GET_API(ConfigSet);
REDISMODULE_GET_API(ConfigSetBool);
REDISMODULE_GET_API(ConfigSetEnum);
REDISMODULE_GET_API(ConfigSetNumeric);


#ifdef REDISMODULE_RLEC_API_DEFS
Expand Down
Loading