diff --git a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala index 87cf706695b69..bb3a6fcb44e28 100644 --- a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala @@ -25,6 +25,7 @@ import kafka.cluster.{Broker, Partition} import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import collection.SortedSet +import scala.util.Try private[producer] object ZKBrokerPartitionInfo { @@ -168,10 +169,7 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath) - val numPartitions = brokerList.map{bid => - val x = ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid) - if (x == "") 0 else bid.toInt - } + val numPartitions = brokerList.map(bid => Try(ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt).getOrElse(0)) val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions) val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)