diff --git a/index.js b/index.js index f9e97ae..a4959dc 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,6 @@ var redis = require('redis'), net = require('net'), + logging = require('minilog')('redis-sentinel:Sentinel'), when = require('when'); function Sentinel(endpoints) { @@ -33,13 +34,19 @@ Sentinel.prototype.createClient = function(masterName, opts) { pubsubClient = this.createClientInternal(masterName, pubsubOpts); pubsubClient.subscribe("+switch-master", function(error) { if (error) { - console.error("Unable to subscribe to Sentinel PUBSUB"); + logging.error("Unable to subscribe to Sentinel PUBSUB"); } }); - pubsubClient.on("message", function(channel, message) { - console.warn("Received +switch-master message from Redis Sentinel.", - " Reconnecting clients."); + pubsubClient.on("message", function (channel, message) { + var failedOverMaster = message.split(" ")[0]; + logging.warn("Received +switch-master message from Redis Sentinel for master", failedOverMaster); + if (failedOverMaster === masterName) { + logging.warn("Reconnecting clients."); self.reconnectAllClients(); + } + else { + logging.warn("Ignoring the message"); + } }); pubsubClient.on("error", function(error) {}); self.pubsub.push(pubsubClient); @@ -55,7 +62,7 @@ Sentinel.prototype.createClientInternal = function(masterName, opts) { opts = opts || {}; var role = opts.role || 'master'; - + logging.debug("Sentinel.createClientInternal: role - " + role ) var endpoints = this.endpoints; @@ -114,7 +121,7 @@ Sentinel.prototype.createClientInternal = function(masterName, opts) { } else { // Try reconnecting - remove the old stream first. client.stream.end(); - + logging.debug("refreshEndpoints : " + resolver.name + " responded with host(" + ip + ") & port(" + port + ")") client.connectionOption.port = port; client.connectionOption.host = ip; client.connection_gone("sentinel induced refresh"); @@ -125,7 +132,6 @@ Sentinel.prototype.createClientInternal = function(masterName, opts) { // Crude but may do for now. On error re-resolve the master // and retry the connection function hitError(eventName, err) { - var _args = arguments; function reemit() { oldEmit.apply(client, _args); @@ -196,6 +202,7 @@ function resolveClient() { // Because finding the master is going to be an async list we will terminate // when we find one then use promises... promise = endpoints.reduce(function(soFar, endpoint) { + logging.debug("Calling " + checkEndpointFn.name +" with endpoint(" + endpoint.host + ":" +endpoint.port + ")") return soFar.then(function() { var deferred = when.defer(); @@ -208,6 +215,7 @@ function resolveClient() { } else { // This is the endpoint that has responded so stick it on the top of // the list + logging.debug(checkEndpointFn.name + " got a response on sentinel endpoint host:"+ endpoint.host + ", port:" + endpoint.port) var index = endpoints.indexOf(endpoint); endpoints.splice(index, 1); endpoints.unshift(endpoint); @@ -232,9 +240,10 @@ function resolveClient() { } function isSentinelOk(endpoint, callback) { - var client = redis.createClient(endpoint.port, endpoint.host); + var client = redis.createClient(endpoint.port, endpoint.host, {connect_timeout: 1000}); var callbackSent = false; client.on("error", function(err) { + logging.error("isSentinelOk Error - " + endpoint.host + ":" + endpoint.port + " - " + err) if (!callbackSent) { callbackSent = true; callback(err); @@ -253,7 +262,7 @@ function isSentinelOk(endpoint, callback) { } function getMasterFromEndpoint(endpoint, masterName, callback) { - var sentinelClient = redis.createClient(endpoint.port, endpoint.host); + var sentinelClient = redis.createClient(endpoint.port, endpoint.host, {connect_timeout: 1000}); var callbackSent = false; // If there is an error then callback with it @@ -277,6 +286,7 @@ function getMasterFromEndpoint(endpoint, masterName, callback) { } else { var ip = result[0]; var port = result[1]; + logging.debug("getMasterFromEndpoint - Redis master host: " + ip + ", port: " + port ) callback(null, ip, port); } }); @@ -284,7 +294,7 @@ function getMasterFromEndpoint(endpoint, masterName, callback) { } function getSlaveFromEndpoint(endpoint, masterName, callback) { - var sentinelClient = redis.createClient(endpoint.port, endpoint.host); + var sentinelClient = redis.createClient(endpoint.port, endpoint.host, {connect_timeout: 1000}); var callbackSent = false; // If there is an error then callback with it @@ -342,10 +352,29 @@ function parseSentinelResponse(resArr){ // Shortcut for quickly getting a client from endpoints function createClient(endpoints, masterName, options) { - var sentinel = Sentinel(endpoints); + var sentinel = Sentinel(shuffle(endpoints)); return sentinel.createClient(masterName, options); } +// From https://bost.ocks.org/mike/shuffle/ +function shuffle(array) { + var m = array.length, t, i; + + // While there remain elements to shuffle… + while (m) { + + // Pick a remaining element… + i = Math.floor(Math.random() * m--); + + // And swap it with the current element. + t = array[m]; + array[m] = array[i]; + array[i] = t; + } + + return array; +} + module.exports.Sentinel = Sentinel; module.exports.createClient = createClient; module.exports.redis = redis; diff --git a/package.json b/package.json index c433029..8ca0547 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "redis-sentinel", - "version": "0.3.3", + "version": "0.3.4", "description": "Redis sentinel client for nodejs", "main": "index.js", "scripts": { @@ -15,7 +15,8 @@ ], "dependencies": { "redis": "0.12.x", - "when": "^3.5.1" + "when": "^3.5.1", + "minilog": "*" }, "devDependencies": { "mocha": "*", diff --git a/test/test.js b/test/test.js index 99a8528..bebbae5 100644 --- a/test/test.js +++ b/test/test.js @@ -1,9 +1,12 @@ var sentinel = require('../'); var expect = require('chai').expect; var redis = require('redis'); +var Minilog = require('minilog'); describe('Redis Sentinel tests', function() { + Minilog.enable() + describe('initial connection', function() { it('should get master correctly with single sentinel', function(done) { @@ -70,7 +73,7 @@ describe('Redis Sentinel tests', function() { var redisClient = sentinel.createClient(endpoints, {role: 'sentinel'}); redisClient.on('ready', function() { expect(redisClient.connectionOption.host).to.equal('127.0.0.1'); - expect(redisClient.connectionOption.port).to.equal("26380"); + expect(["26380", "26379"]).to.contain(redisClient.connectionOption.port); done(); }); }); @@ -113,7 +116,7 @@ describe('Redis Sentinel tests', function() { var redisClient = sentinel.createClient(endpoints, {role: 'sentinel'}); redisClient.on('ready', function() { expect(redisClient.connectionOption.host).to.equal('127.0.0.1'); - expect(redisClient.connectionOption.port).to.equal("26380"); + expect(["26380", "26379"]).to.contain(redisClient.connectionOption.port); done(); }); });