From bd5efa9f9dd3e94be175acfce6226a728c87e5ce Mon Sep 17 00:00:00 2001 From: Zhang Xian Date: Wed, 31 Jan 2018 11:37:47 +0800 Subject: [PATCH] add the receiver stop function When application is stoping , it will pop last a data from redis. It throws exception because it can't store the last data,and this data will lose. This commition has solved this bug in a way that stop the connect. It has a deficiencies.When stopd the connection ,it will throws ConnectException. --- .../redis/streaming/RedisInputDStream.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala b/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala index 79a2741a..1b00f303 100644 --- a/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala +++ b/src/main/scala/com/redislabs/provider/redis/streaming/RedisInputDStream.scala @@ -29,13 +29,17 @@ private class RedisReceiver[T: ClassTag](keys: Array[String], redisConfig: RedisConfig, streamType: Class[T]) extends Receiver[T](storageLevel) { + + var jedisConnect: Jedis = null def onStart() { val executorPool = ThreadUtils.newFixedThreadPool(keys.length, "BlockLists Streaming") try { /* start a executor for each interested List */ - keys.foreach{ key => - executorPool.submit(new MessageHandler(redisConfig.connectionForKey(key), key)) + keys.foreach{ key =>{ + jedisConnect = redisConfig.connectionForKey(key) + executorPool.submit(new MessageHandler(jedisConnect, key)) + } } } finally { executorPool.shutdown() @@ -43,6 +47,11 @@ private class RedisReceiver[T: ClassTag](keys: Array[String], } def onStop() { + /* quit the connect*/ + if (jedisConnect != null) { + jedisConnect.quit() + jedisConnect = null + } } private class MessageHandler(conn: Jedis, key: String) extends Runnable { @@ -50,7 +59,7 @@ private class RedisReceiver[T: ClassTag](keys: Array[String], try { while(!isStopped) { val response = conn.blpop(2, key) - if (response == null) { + if (response == null || response.isEmpty) { } else if (classTag[T] == classTag[String]) { store(response.get(1).asInstanceOf[T])