Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 14 additions & 61 deletions src/actions/list.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ const {
} = require('../constants');

const {
buildSearchQuery,
normalizeFilterProp,
redisSearchQuery,
redisAggregateQuery,
} = require('../utils/redis-search-stack');

// helper
const JSONParse = (data) => JSON.parse(data);

const extractUserId = (keyPrefix) => (userKey) => userKey.split('!')[0].slice(keyPrefix.length);

// fetches basic ids
async function fetchIds() {
const {
Expand Down Expand Up @@ -51,66 +49,21 @@ async function fetchIds() {
async function redisSearchIds() {
const {
service,
redis,
args: request,
args,
filter = {},
audience,
offset,
limit,
redisSearchConfig,
} = this;

service.log.debug({ criteria: request.criteria, filter }, 'users list searching...');
const { keyPrefix } = service.config.redis.options;

const { indexName, multiWords } = service.redisSearch.getIndexMetadata(audience);

service.log.debug('search using index: %s', indexName);
const args = ['FT.SEARCH', indexName];

const query = [];
const params = [];

for (const [propName, actionTypeOrValue] of Object.entries(filter)) {
const prop = normalizeFilterProp(propName, actionTypeOrValue);

if (actionTypeOrValue !== undefined) {
const [sQuery, sParams] = buildSearchQuery(prop, actionTypeOrValue, { multiWords });

query.push(sQuery);
params.push(...sParams); // name, value
}
}

if (query.length > 0) {
args.push(query.join(' '));
} else {
args.push('*');
}

// TODO extract to redis aearch utils
if (params.length > 0) {
args.push('PARAMS', params.length, ...params);
args.push('DIALECT', '2'); // use params dialect
}

// sort the response
if (request.criteria) {
args.push('SORTBY', request.criteria, request.order);
}
// limits
args.push('LIMIT', offset, limit);

// we'll fetch the data later
args.push('NOCONTENT');

// [total, [ids]]
service.log.info('redis search query: %s', args.join(' '));
service.log.debug({ criteria: args.criteria, filter }, 'users list searching...');

const [total, ...keys] = await redis.call(...args);
const useAggregation = redisSearchConfig.queryMethod === 'aggregate';
const searchQuery = useAggregation ? redisAggregateQuery : redisSearchQuery;

const extractId = extractUserId(keyPrefix);
const indexMeta = service.redisSearch.getIndexMetadata(audience);
service.log.debug('search using index: %s', indexMeta.indexName);

const ids = keys.map(extractId);
const { total, ids } = await searchQuery(indexMeta, this);

service.log.info({ ids }, 'search result: %d', total);

Expand All @@ -135,15 +88,15 @@ async function fetchUserData(ids) {
service,
redis,
audience,
seachEnabled,
redisSearchConfig,
offset,
limit,
userIdsOnly,
} = this;

let dataKey = USERS_METADATA;

if (seachEnabled) {
if (redisSearchConfig.enabled) {
const meta = service.redisSearch.getIndexMetadata(audience);
dataKey = meta.filterKey;
}
Expand Down Expand Up @@ -225,8 +178,8 @@ module.exports = function iterateOverActiveUsers({ params }) {
const ctx = {
// service parts
redis,
seachEnabled: config.redisSearch.enabled,
service: this,
redisSearchConfig: config.redisSearch,

// input parts for lua script
keys: [
Expand All @@ -251,7 +204,7 @@ module.exports = function iterateOverActiveUsers({ params }) {
audience,
};

const findUserIds = ctx.seachEnabled ? redisSearchIds : fetchIds;
const findUserIds = ctx.redisSearchConfig.enabled ? redisSearchIds : fetchIds;

return Promise
.bind(ctx)
Expand Down
1 change: 1 addition & 0 deletions src/configs/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@ exports.mfa = {
*/
exports.redisSearch = {
enabled: false,
queryMethod: 'search', // search | aggregate
};
2 changes: 1 addition & 1 deletion src/configs/redis-indexes.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

exports.redisIndexDefinitions = [
// Index Name: {ms-users}-metadata-*.localhost-idx
// Index Name: {ms-users}-metadata-*.localhost-v1-idx
// Index Filter: metadata!*.localhost
{
version: '1',
Expand Down
22 changes: 22 additions & 0 deletions src/utils/redis-search-stack/build-filter-query.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const buildSearchQuery = require('./build-search-query');
const normalizeFilterProp = require('./normalize-filter-prop');

function buildFilterQuery(filter, multiWords) {
const query = [];
const params = [];

for (const [propName, actionTypeOrValue] of Object.entries(filter)) {
const prop = normalizeFilterProp(propName, actionTypeOrValue);

if (actionTypeOrValue !== undefined) {
const [sQuery, sParams] = buildSearchQuery(prop, actionTypeOrValue, { multiWords });

query.push(sQuery);
params.push(...sParams); // name, value
}
}

return [query, params];
}

module.exports = buildFilterQuery;
4 changes: 3 additions & 1 deletion src/utils/redis-search-stack/create-hash-index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const redisKey = require('../key');
const { containsKeyExpr } = require('./expressions');

/**
* @param {Object} service provides redis, log
* @param {Object} keyPrefix root prefix, e.g. {ms-users}
Expand All @@ -14,7 +16,7 @@ async function createHashIndex({ redis, log }, indexName, prefix, filter, fields
if (filter) {
const key = redisKey('', filter); // leading separator
filterExpr.push('FILTER');
filterExpr.push(`'contains(@__key, ${key}) > 0'`);
filterExpr.push(containsKeyExpr(key));
}

try {
Expand Down
1 change: 1 addition & 0 deletions src/utils/redis-search-stack/expressions.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ module.exports = exports = {

// Utils
tokenize: (value) => value.replace(PUNCTUATION_REGEX, ' ').split(/\s/),
containsKeyExpr: (value) => `contains(@__key, "${value}")`,
};
3 changes: 3 additions & 0 deletions src/utils/redis-search-stack/extract-user-id.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const extractUserId = (keyPrefix) => (userKey) => userKey.split('!')[0].slice(keyPrefix.length);

module.exports = extractUserId;
9 changes: 5 additions & 4 deletions src/utils/redis-search-stack/index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
const ensureSearchIndexes = require('./ensure-indexes');
const normalizeFilterProp = require('./normalize-filter-prop');
const normalizeIndexName = require('./normalize-index-name');
const buildSearchQuery = require('./build-search-query');

const redisSearchQuery = require('./query-search');
const redisAggregateQuery = require('./query-aggregate');

module.exports = {
ensureSearchIndexes,
normalizeFilterProp,
normalizeIndexName,
buildSearchQuery,
redisSearchQuery,
redisAggregateQuery,
};
60 changes: 60 additions & 0 deletions src/utils/redis-search-stack/query-aggregate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const redisKey = require('../key');
const extractUserId = require('./extract-user-id');
const { containsKeyExpr } = require('./expressions');
const buildFilterQuery = require('./build-filter-query');

async function redisAggregateQuery(indexMeta, context) {
const {
service,
redis,
args: request,
filter = {},
audience,
offset,
limit,
} = context;

const { keyPrefix } = service.config.redis.options;

const { indexName, filterKey, multiWords } = indexMeta;

const args = ['FT.AGGREGATE', indexName];

const [query, params] = buildFilterQuery(filter, multiWords);

if (query.length > 0) {
args.push(query.join(' '));
} else {
args.push('*');
}

const load = ['@id', '@__key']; // TODO field from config
args.push('LOAD', load.length, ...load);

const filterCondition = redisKey('', filterKey, audience); // with leading separator
args.push('FILTER', containsKeyExpr(filterCondition));

// TODO extract to redis aearch utils
if (params.length > 0) {
args.push('PARAMS', params.length, ...params);
args.push('DIALECT', '2'); // use params dialect
}

// sort the response
if (request.criteria) {
args.push('SORTBY', request.criteria, request.order);
}
// limits
args.push('LIMIT', offset, limit);

service.log.info('redis aggregate query: %s', args.join(' '));

const [total, ...keys] = await redis.call(...args);

const extractId = extractUserId(keyPrefix);
const ids = keys.map(([, key]) => extractId(key));

return { total, ids };
}

module.exports = redisAggregateQuery;
55 changes: 55 additions & 0 deletions src/utils/redis-search-stack/query-search.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
const buildFilterQuery = require('./build-filter-query');
const extractUserId = require('./extract-user-id');

async function redisSearchQuery(indexMeta, context) {
const {
service,
redis,
args: request,
filter = {},
offset,
limit,
} = context;

const { keyPrefix } = service.config.redis.options;
const { indexName, multiWords } = indexMeta;

const args = ['FT.SEARCH', indexName];

const [query, params] = buildFilterQuery(filter, multiWords);

if (query.length > 0) {
args.push(query.join(' '));
} else {
args.push('*');
}

// TODO extract to redis aearch utils
if (params.length > 0) {
args.push('PARAMS', params.length, ...params);
args.push('DIALECT', '2'); // use params dialect
}

// sort the response
if (request.criteria) {
args.push('SORTBY', request.criteria, request.order);
}
// limits
args.push('LIMIT', offset, limit);

// we'll fetch the data later
args.push('NOCONTENT');

// [total, [ids]]
service.log.info('redis search query: %s', args.join(' '));

const [total, ...keys] = await redis.call(...args);

const extractId = extractUserId(keyPrefix);

const ids = keys.map(extractId);

return { total, ids };
}

module.exports = redisSearchQuery;
Loading