Skip to content
This repository was archived by the owner on Jan 4, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var redis = require('redis'),
net = require('net'),
logging = require('minilog')('redis-sentinel:Sentinel'),
when = require('when');

function Sentinel(endpoints) {
Expand Down Expand Up @@ -33,13 +34,19 @@ Sentinel.prototype.createClient = function(masterName, opts) {
pubsubClient = this.createClientInternal(masterName, pubsubOpts);
pubsubClient.subscribe("+switch-master", function(error) {
if (error) {
console.error("Unable to subscribe to Sentinel PUBSUB");
logging.error("Unable to subscribe to Sentinel PUBSUB");
}
});
pubsubClient.on("message", function(channel, message) {
console.warn("Received +switch-master message from Redis Sentinel.",
" Reconnecting clients.");
pubsubClient.on("message", function (channel, message) {
var failedOverMaster = message.split(" ")[0];
logging.warn("Received +switch-master message from Redis Sentinel for master", failedOverMaster);
if (failedOverMaster === masterName) {
logging.warn("Reconnecting clients.");
self.reconnectAllClients();
}
else {
logging.warn("Ignoring the message");
}
});
pubsubClient.on("error", function(error) {});
self.pubsub.push(pubsubClient);
Expand All @@ -55,7 +62,7 @@ Sentinel.prototype.createClientInternal = function(masterName, opts) {

opts = opts || {};
var role = opts.role || 'master';

logging.debug("Sentinel.createClientInternal: role - " + role )
var endpoints = this.endpoints;


Expand Down Expand Up @@ -114,7 +121,7 @@ Sentinel.prototype.createClientInternal = function(masterName, opts) {
} else {
// Try reconnecting - remove the old stream first.
client.stream.end();

logging.debug("refreshEndpoints : " + resolver.name + " responded with host(" + ip + ") & port(" + port + ")")
client.connectionOption.port = port;
client.connectionOption.host = ip;
client.connection_gone("sentinel induced refresh");
Expand All @@ -125,7 +132,6 @@ Sentinel.prototype.createClientInternal = function(masterName, opts) {
// Crude but may do for now. On error re-resolve the master
// and retry the connection
function hitError(eventName, err) {

var _args = arguments;
function reemit() {
oldEmit.apply(client, _args);
Expand Down Expand Up @@ -196,6 +202,7 @@ function resolveClient() {
// Because finding the master is going to be an async list we will terminate
// when we find one then use promises...
promise = endpoints.reduce(function(soFar, endpoint) {
logging.debug("Calling " + checkEndpointFn.name +" with endpoint(" + endpoint.host + ":" +endpoint.port + ")")
return soFar.then(function() {
var deferred = when.defer();

Expand All @@ -208,6 +215,7 @@ function resolveClient() {
} else {
// This is the endpoint that has responded so stick it on the top of
// the list
logging.debug(checkEndpointFn.name + " got a response on sentinel endpoint host:"+ endpoint.host + ", port:" + endpoint.port)
var index = endpoints.indexOf(endpoint);
endpoints.splice(index, 1);
endpoints.unshift(endpoint);
Expand All @@ -232,9 +240,10 @@ function resolveClient() {
}

function isSentinelOk(endpoint, callback) {
var client = redis.createClient(endpoint.port, endpoint.host);
var client = redis.createClient(endpoint.port, endpoint.host, {connect_timeout: 1000});
var callbackSent = false;
client.on("error", function(err) {
logging.error("isSentinelOk Error - " + endpoint.host + ":" + endpoint.port + " - " + err)
if (!callbackSent) {
callbackSent = true;
callback(err);
Expand All @@ -253,7 +262,7 @@ function isSentinelOk(endpoint, callback) {
}

function getMasterFromEndpoint(endpoint, masterName, callback) {
var sentinelClient = redis.createClient(endpoint.port, endpoint.host);
var sentinelClient = redis.createClient(endpoint.port, endpoint.host, {connect_timeout: 1000});
var callbackSent = false;

// If there is an error then callback with it
Expand All @@ -277,14 +286,15 @@ function getMasterFromEndpoint(endpoint, masterName, callback) {
} else {
var ip = result[0];
var port = result[1];
logging.debug("getMasterFromEndpoint - Redis master host: " + ip + ", port: " + port )
callback(null, ip, port);
}
});
sentinelClient.quit();
}

function getSlaveFromEndpoint(endpoint, masterName, callback) {
var sentinelClient = redis.createClient(endpoint.port, endpoint.host);
var sentinelClient = redis.createClient(endpoint.port, endpoint.host, {connect_timeout: 1000});
var callbackSent = false;

// If there is an error then callback with it
Expand Down Expand Up @@ -342,10 +352,29 @@ function parseSentinelResponse(resArr){

// Shortcut for quickly getting a client from endpoints
function createClient(endpoints, masterName, options) {
var sentinel = Sentinel(endpoints);
var sentinel = Sentinel(shuffle(endpoints));
return sentinel.createClient(masterName, options);
}

// From https://bost.ocks.org/mike/shuffle/
function shuffle(array) {
var m = array.length, t, i;

// While there remain elements to shuffle…
while (m) {

// Pick a remaining element…
i = Math.floor(Math.random() * m--);

// And swap it with the current element.
t = array[m];
array[m] = array[i];
array[i] = t;
}

return array;
}

module.exports.Sentinel = Sentinel;
module.exports.createClient = createClient;
module.exports.redis = redis;
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "redis-sentinel",
"version": "0.3.3",
"version": "0.3.4",
"description": "Redis sentinel client for nodejs",
"main": "index.js",
"scripts": {
Expand All @@ -15,7 +15,8 @@
],
"dependencies": {
"redis": "0.12.x",
"when": "^3.5.1"
"when": "^3.5.1",
"minilog": "*"
},
"devDependencies": {
"mocha": "*",
Expand Down
7 changes: 5 additions & 2 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
var sentinel = require('../');
var expect = require('chai').expect;
var redis = require('redis');
var Minilog = require('minilog');

describe('Redis Sentinel tests', function() {

Minilog.enable()

describe('initial connection', function() {

it('should get master correctly with single sentinel', function(done) {
Expand Down Expand Up @@ -70,7 +73,7 @@ describe('Redis Sentinel tests', function() {
var redisClient = sentinel.createClient(endpoints, {role: 'sentinel'});
redisClient.on('ready', function() {
expect(redisClient.connectionOption.host).to.equal('127.0.0.1');
expect(redisClient.connectionOption.port).to.equal("26380");
expect(["26380", "26379"]).to.contain(redisClient.connectionOption.port);
done();
});
});
Expand Down Expand Up @@ -113,7 +116,7 @@ describe('Redis Sentinel tests', function() {
var redisClient = sentinel.createClient(endpoints, {role: 'sentinel'});
redisClient.on('ready', function() {
expect(redisClient.connectionOption.host).to.equal('127.0.0.1');
expect(redisClient.connectionOption.port).to.equal("26380");
expect(["26380", "26379"]).to.contain(redisClient.connectionOption.port);
done();
});
});
Expand Down