Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: node_js
node_js:
- "4"
- "12"
services:
- redis-server
224 changes: 166 additions & 58 deletions lib/QuerySwarm.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ module.exports = function(redis) {
// workers are currently active
self.active = false;

// whether or not QuerySwarm is populating the queue
self.populating = false;

// the user-provided query that fetches tasks
self.query = query;

// the worker that processes a task
self.worker = worker;

// the unix timestamp when a query was last run
self.throttle = 0;

// the Worker objects that consume the queue
self.workers = [];

Expand All @@ -49,7 +49,7 @@ module.exports = function(redis) {
throttle: typeof options.throttle === 'number' ? options.throttle : 10000,
// queue length at which to trigger another query
threshold: typeof options.threshold === 'number' ? options.threshold : 10,
// the duration a worker waits between between polling an empty queue
// the duration a worker waits between polling an empty queue
retryDelay: typeof options.retryDelay === 'number' ? options.retryDelay : 500,
// the max duration a query can run before we try again
lockTimeout: typeof options.lockTimeout === 'number' ? options.lockTimeout : 20000,
Expand All @@ -58,9 +58,11 @@ module.exports = function(redis) {
// maximum concurrent workers per process
get concurrency() { return concurrency; },
set concurrency(value) {
// FIXME if this delete's workers, it would need to also stop them, which is an async operation
var params = [0, Math.max(0, self.workers.length - value)];
let i = 0;
while(params.length - 2 < value - self.workers.length) {
var worker = new Worker(self);
var worker = new Worker(self, ++i);
if(self.active) worker.start();
params.push(worker);
}
Expand All @@ -77,7 +79,7 @@ module.exports = function(redis) {
self.redlock.on('clientError', function(err) {
self.emit('clientError', err);
});
}
};

util.inherits(QuerySwarm, EventEmitter);

Expand All @@ -89,6 +91,7 @@ module.exports = function(redis) {
});

self.active = true;
self.emit('started');
return self;
};

Expand All @@ -99,6 +102,7 @@ module.exports = function(redis) {
worker.stop(callback);
}, function(err, res) {
if(!err) self.active = false;
self.emit('stopped', err, res);
if(callback instanceof Function) callback(err, res);
});

Expand All @@ -107,95 +111,199 @@ module.exports = function(redis) {

QuerySwarm.prototype.destroy = function(callback) {
var self = this;
self.stop(function(){
self.stop(function(err, res){

// TODO: get the lock, cursor, queue, processing, & deadletter; write to file??? emit??? log???

redis.del(self.id + ':lock', self.id + ':throttle', self.id + ':cursor', self.id + ':queue', self.id + ':processing', self.id + ':deadletter', callback);
redis.del(self.id + ':lock', self.id + ':throttle', self.id + ':cursor', self.id + ':queue', self.id + ':processing', self.id + ':deadletter', (err2) => {
callback(err2);
self.emit('destroyed', err, res);
});
});
return self;
};

QuerySwarm.prototype.getCursor = function(callback) {
return redis.get(this.id + ':cursor', function(err, cursor) {
return callback(err, JSON.parse(cursor));
});
};

QuerySwarm.prototype.setCursor = function(value, callback) {
if (self.populating) return callback(new Error('Cannot update cursor of swarm while it is populating. The new cursor will be overwritten by the worker before a new query is run.'));
return self.redlock.lock(self.id + ':lock', 1000, function(err, lock) {
if (err) return callback(err);
return redis.set(this.id + ':cursor', value, function(err) {
lock.unlock();
return callback(err);
});
});
};

QuerySwarm.prototype.getThrottle = function(callback) {
return redis.pttl(this.id + ':throttle', function(err, pttl) {
return callback(err, Date.now() + Math.max(pttl,0));
});
};

QuerySwarm.prototype.getQueue = function(callback) {
return redis.lrange(this.id + ':queue', 0, -1, function(err, queue) {
return callback(err, queue.map(function(q){ return JSON.parse(q); }));
});
};

QuerySwarm.prototype.getQueueLength = function(callback) {
return redis.llen(this.id + ':queue', function(err, llen) {
return callback(err, llen);
});
};

QuerySwarm.prototype.getProcessing = function(callback) {
return redis.lrange(this.id + ':processing', 0, -1, function(err, processing) {
return callback(err, processing.map(function(q){ return JSON.parse(q); }));
});
};

QuerySwarm.prototype.getProcessingLength = function(callback) {
return redis.llen(this.id + ':processing', function(err, llen) {
return callback(err, llen);
});
};

QuerySwarm.prototype.getDeadletter = function(callback) {
return redis.lrange(this.id + ':deadletter', 0, -1, function(err, deadletter) {
return callback(err, deadletter.map(function(q){ return JSON.parse(q); }));
});
};

QuerySwarm.prototype.getDeadletterLength = function(callback) {
return redis.llen(this.id + ':deadletter', function(err, llen) {
return callback(err, llen);
});
};

QuerySwarm.prototype.getStatus = function(callback) {
var self = this;
return redis.multi()
.llen(this.id + ':processing')
.llen(this.id + ':queue')
.llen(this.id + ':deadletter')
.pttl(this.id + ':throttle')
.get(this.id + ':cursor')
.execute(function(err, res) {
if (err) return callback(err);
return callback(null, {
active: self.active,
populating: self.populating,
processing: res[0],
queued: res[1],
deadlettered: res[2],
throttle: res[3],
cursor: res[4],
workers: self.workers.map(function(worker) {
return {
id: worker.id,
active: worker.active,
stopping: worker.stopping,
job: worker.job,
};
}),
});
});
};

QuerySwarm.prototype.populate = function() {
var self = this;

// throttle the frequency of populate calls
if(self.active === true && self.throttle > Date.now())
return;

self.throttle = Date.now() + self.options.throttle;
return self.getThrottle(function(err, throttle) {
if (err)
return;

// defer to the next tick
setImmediate(function(){
if(self.active !== true)
if(self.active === true && throttle > Date.now())
return;

// acquire a redis lock
self.redlock.lock(self.id + ':lock', self.options.lockTimeout, function(err, lock) {
if(err || !lock)
// defer to the next tick
return setImmediate(function(){
if(self.active !== true)
return;

// acquire the distributed throttle
self.redlock.lock(self.id + ':throttle', self.options.throttle, function(err, throttle) {
if(err || !throttle) {
lock.unlock();
// acquire a redis lock
return self.redlock.lock(self.id + ':lock', self.options.lockTimeout, function(err, lock) {
if(err || !lock)
return;
}

// get the cursor
redis.get(self.id + ':cursor', function(err, cursor) {
if(err) {
// acquire the distributed throttle
return self.redlock.lock(self.id + ':throttle', self.options.throttle, function(err, throttle) {
if(err || !throttle) {
lock.unlock();
return self.emit('error', err, 'error getting cursor');
return;
}

cursor = JSON.parse(cursor);
self.throttle = Date.now() + self.options.throttle;

// run the user-provided query
self.query(cursor, function(err, cursor, contents) {
// get the cursor
return self.getCursor(function(err, cursor) {
if(err) {
lock.unlock();
return self.emit('error', err, 'error running the user-provided query');
return self.emit('error', err, 'error getting cursor');
}

var tasks = (Array.isArray(contents) && contents.length > 0) ? contents.map(JSON.stringify) : [];
self.populating = true;

// VERY BAD: we've exceeded our lock timeout
if(Date.now() >= lock.expiration)
return self.emit('error', new OrphanedQuery(cursor, tasks));
// run the user-provided query
return self.query(cursor, function(err, cursor, contents) {

// TODO: extend the lock if its expiration has fallen below a configurable buffer
if(err) {
self.populating = false;
lock.unlock();
return self.emit('error', err, 'error running the user-provided query');
}

var multi = redis.multi();
var tasks = (Array.isArray(contents) && contents.length > 0) ? contents.map(JSON.stringify) : [];

// update the redis cursor
multi = multi.set(self.id + ':cursor', JSON.stringify(cursor));
// VERY BAD: we've exceeded our lock timeout
if(Date.now() >= lock.expiration){
self.populating = false;
return self.emit('error', new OrphanedQuery(cursor, tasks));
}

// add tasks to redis queue
if(tasks.length > 0) {
var params = tasks.slice();
while (params.length > 0) {
var args = params.splice(0,10000)
args.unshift(self.id + ':queue');
multi = multi.lpush.apply(multi, args);
// TODO: extend the lock if its expiration has fallen below a configurable buffer

var multi = redis.multi();

// update the redis cursor
multi = multi.set(self.id + ':cursor', JSON.stringify(cursor));

// add tasks to redis queue
if(tasks.length > 0) {
var params = tasks.slice();
while (params.length > 0) {
var args = params.splice(0,10000)
args.unshift(self.id + ':queue');
multi = multi.lpush.apply(multi, args);
}
}
}

multi.exec(function(err) {
return multi.exec(function(err) {

// release the lock
lock.unlock();
// We are done populating and the cursor has been updated
self.populating = false;

// release the lock
lock.unlock();

// TODO: this *could* be a bad place to have an error for different reasons
// we need to check which command threw and act accordingly:
// 0. new OrphanedQuery(cursor, tasks) // maybe emergency (depends on query)
// 1. new OrphanedTasks(cursor, tasks) // definitely emergency, we have updated the cursor without pushing its tasks to the queue
// 2. deadlock; will fix itself
// TODO: this *could* be a bad place to have an error for different reasons
// we need to check which command threw and act accordingly:
// 0. new OrphanedQuery(cursor, tasks) // maybe emergency (depends on query)
// 1. new OrphanedTasks(cursor, tasks) // definitely emergency, we have updated the cursor without pushing its tasks to the queue
// 2. deadlock; will fix itself

if(err)
return self.emit('error', err, 'error populating queue');
if(err)
return self.emit('error', err, 'error populating queue');

self.emit('populate', cursor, tasks);
self.emit('populate', cursor, tasks);
});
});
});
});
Expand Down
Loading