kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Use match instead of if/else in KafkaZkClient
Date Wed, 15 Nov 2017 14:04:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3cfbb25c6 -> f300480f8


MINOR: Use match instead of if/else in KafkaZkClient

Also use ZkVersion.NoVersion instead of -1.

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #4196 from mimaison/zkclient_refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f300480f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f300480f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f300480f

Branch: refs/heads/trunk
Commit: f300480f882e8b17999e1172bfbec0f13c71eda7
Parents: 3cfbb25
Author: Mickael Maison <mickael.maison@gmail.com>
Authored: Wed Nov 15 12:50:07 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Nov 15 13:37:07 2017 +0000

----------------------------------------------------------------------
 .../ZkNodeChangeNotificationListener.scala      |   9 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 385 ++++++++-----------
 core/src/main/scala/kafka/zk/ZkData.scala       |   4 +
 3 files changed, 175 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f300480f/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
index f589430..f0d4b1b 100644
--- a/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
+++ b/core/src/main/scala/kafka/common/ZkNodeChangeNotificationListener.scala
@@ -84,11 +84,10 @@ class ZkNodeChangeNotificationListener(private val zkClient: KafkaZkClient,
           val changeId = changeNumber(notification)
           if (changeId > lastExecutedChange) {
             val changeZnode = seqNodeRoot + "/" + notification
-            val data = zkClient.getDataAndStat(changeZnode)._1.orNull
-            if (data != null) {
-              notificationHandler.processNotification(data)
-            } else {
-              logger.warn(s"read null data from $changeZnode when processing notification
$notification")
+            val (data, _) = zkClient.getDataAndStat(changeZnode)
+            data match {
+              case Some(d) => notificationHandler.processNotification(d)
+              case None => logger.warn(s"read null data from $changeZnode when processing
notification $notification")
             }
             lastExecutedChange = changeId
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f300480f/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 97b7c98..a753c5b 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -133,13 +133,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     }
     setDataResponses.foreach { setDataResponse =>
       val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition]
-      if (setDataResponse.resultCode == Code.OK) {
-        val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
-        successfulUpdates.put(partition, updatedLeaderAndIsr)
-      } else if (setDataResponse.resultCode == Code.BADVERSION) {
-        updatesToRetry += partition
-      } else {
-        failed.put(partition, setDataResponse.resultException.get)
+      setDataResponse.resultCode match {
+        case Code.OK =>
+          val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
+          successfulUpdates.put(partition, updatedLeaderAndIsr)
+        case Code.BADVERSION => updatesToRetry += partition
+        case _ => failed.put(partition, setDataResponse.resultException.get)
       }
     }
     UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
@@ -166,15 +165,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     }
     configResponses.foreach { configResponse =>
       val topic = configResponse.ctx.get.asInstanceOf[String]
-      if (configResponse.resultCode == Code.OK) {
-        val overrides = ConfigEntityZNode.decode(configResponse.data)
-        val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
-        logConfigs.put(topic, logConfig)
-      } else if (configResponse.resultCode == Code.NONODE) {
-        val logConfig = LogConfig.fromProps(config, new Properties)
-        logConfigs.put(topic, logConfig)
-      } else {
-        failed.put(topic, configResponse.resultException.get)
+      configResponse.resultCode match {
+        case Code.OK =>
+          val overrides = ConfigEntityZNode.decode(configResponse.data)
+          val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+          logConfigs.put(topic, logConfig)
+        case Code.NONODE =>
+          val logConfig = LogConfig.fromProps(config, new Properties)
+          logConfigs.put(topic, logConfig)
+        case _ => failed.put(topic, configResponse.resultException.get)
       }
     }
     (logConfigs.toMap, failed.toMap)
@@ -186,24 +185,24 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def getAllBrokersInCluster: Seq[Broker] = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      val brokerIds = getChildrenResponse.children.map(_.toInt)
-      val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId),
ctx = Some(brokerId)))
-      val getDataResponses = retryRequestsUntilConnected(getDataRequests)
-      getDataResponses.flatMap { getDataResponse =>
-        val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
-        if (getDataResponse.resultCode == Code.OK) {
-          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
-        } else if (getDataResponse.resultCode == Code.NONODE) {
-          None
-        } else {
-          throw getDataResponse.resultException.get
+    getChildrenResponse.resultCode match {
+      case Code.OK =>
+        val brokerIds = getChildrenResponse.children.map(_.toInt)
+        val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId),
ctx = Some(brokerId)))
+        val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+        getDataResponses.flatMap { getDataResponse =>
+          val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
+          getDataResponse.resultCode match {
+            case Code.OK =>
+              Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+            case Code.NONODE => None
+            case _ => throw getDataResponse.resultException.get
+          }
         }
-      }
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
+      case Code.NONODE =>
+        Seq.empty
+      case _ =>
+        throw getChildrenResponse.resultException.get
     }
   }
 
@@ -213,13 +212,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def getAllTopicsInCluster: Seq[String] = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
+    getChildrenResponse.resultCode match {
+      case Code.OK => getChildrenResponse.children
+      case Code.NONODE => Seq.empty
+      case _ => throw getChildrenResponse.resultException.get
     }
+
   }
 
   /**
@@ -239,12 +237,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def getAllLogDirEventNotifications: Seq[String] = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
+    getChildrenResponse.resultCode match {
+      case Code.OK => getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)
+      case Code.NONODE => Seq.empty
+      case _ => throw getChildrenResponse.resultException.get
     }
   }
 
@@ -259,12 +255,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     }
     val getDataResponses = retryRequestsUntilConnected(getDataRequests)
     getDataResponses.flatMap { getDataResponse =>
-      if (getDataResponse.resultCode == Code.OK) {
-        LogDirEventNotificationSequenceZNode.decode(getDataResponse.data)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        None
-      } else {
-        throw getDataResponse.resultException.get
+      getDataResponse.resultCode match {
+        case Code.OK => LogDirEventNotificationSequenceZNode.decode(getDataResponse.data)
+        case Code.NONODE => None
+        case _ => throw getDataResponse.resultException.get
       }
     }
   }
@@ -302,12 +296,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
     getDataResponses.flatMap { getDataResponse =>
       val topic = getDataResponse.ctx.get.asInstanceOf[String]
-      if (getDataResponse.resultCode == Code.OK) {
-        TopicZNode.decode(topic, getDataResponse.data)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        Map.empty[TopicPartition, Seq[Int]]
-      } else {
-        throw getDataResponse.resultException.get
+      getDataResponse.resultCode match {
+        case Code.OK => TopicZNode.decode(topic, getDataResponse.data)
+        case Code.NONODE => Map.empty[TopicPartition, Seq[Int]]
+        case _ => throw getDataResponse.resultException.get
       }
     }.toMap
   }
@@ -330,23 +322,14 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * @param path zk node path
    * @return A tuple of 2 elements, where first element is zk node data as string
    *         and second element is zk node version.
-   *         returns (None, -1) if node doesn't exists and throws exception for any error
+   *         returns (None, ZkVersion.NoVersion) if node doesn't exists and throws exception
for any error
    */
   def getDataAndVersion(path: String): (Option[String], Int) = {
-    val getDataRequest = GetDataRequest(path)
-    val getDataResponse = retryRequestUntilConnected(getDataRequest)
-
-    if (getDataResponse.resultCode == Code.OK) {
-      if (getDataResponse.data == null)
-        (None, getDataResponse.stat.getVersion)
-      else {
-        val data = new String(getDataResponse.data, UTF_8)
-        (Some(data), getDataResponse.stat.getVersion)
-      }
-    } else if (getDataResponse.resultCode == Code.NONODE)
-      (None, -1)
-    else
-      throw getDataResponse.resultException.get
+    val (data, stat) = getDataAndStat(path)
+    stat match {
+      case ZkStat.NoStat => (data, ZkVersion.NoVersion)
+      case _ => (data, stat.getVersion)
+    }
   }
 
   /**
@@ -354,23 +337,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * @param path zk node path
    * @return A tuple of 2 elements, where first element is zk node data as string
    *         and second element is zk node stats.
-   *         returns (None, new Stat()) if node doesn't exists and throws exception for any
error
+   *         returns (None, ZkStat.NoStat) if node doesn't exists and throws exception for
any error
    */
   def getDataAndStat(path: String): (Option[String], Stat) = {
     val getDataRequest = GetDataRequest(path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
 
-    if (getDataResponse.resultCode == Code.OK) {
-      if (getDataResponse.data == null)
-        (None, getDataResponse.stat)
-      else {
-        val data = new String(getDataResponse.data, UTF_8)
-        (Some(data), getDataResponse.stat)
-      }
-    } else if (getDataResponse.resultCode  == Code.NONODE) {
-      (None, new Stat())
-    } else {
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case Code.OK =>
+        if (getDataResponse.data == null)
+          (None, getDataResponse.stat)
+        else {
+          val data = Option(getDataResponse.data).map(new String(_, UTF_8))
+          (data, getDataResponse.stat)
+        }
+      case Code.NONODE => (None, ZkStat.NoStat)
+      case _ => throw getDataResponse.resultException.get
     }
   }
 
@@ -381,18 +363,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def getChildren(path : String): Seq[String] = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
+    getChildrenResponse.resultCode match {
+      case Code.OK => getChildrenResponse.children
+      case Code.NONODE => Seq.empty
+      case _ => throw getChildrenResponse.resultException.get
     }
   }
 
   /**
    * Conditional update the persistent path data, return (true, newVersion) if it succeeds,
otherwise (the path doesn't
-   * exist, the current version is not the expected version, etc.) return (false, -1)
+   * exist, the current version is not the expected version, etc.) return (false, ZkVersion.NoVersion)
    *
    * When there is a ConnectionLossException during the conditional update, ZookeeperClient
will retry the update and may fail
    * since the previous update may have succeeded (but the stored zkVersion no longer matches
the expected one).
@@ -417,13 +397,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
             debug("Checker method is not passed skipping zkData match")
             debug("Conditional update of path %s with data %s and expected version %d failed
due to %s"
               .format(path, data, expectVersion, setDataResponse.resultException.get.getMessage))
-            (false, -1)
+            (false, ZkVersion.NoVersion)
         }
 
       case Code.NONODE =>
         debug("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
           expectVersion, setDataResponse.resultException.get.getMessage))
-        (false, -1)
+        (false, ZkVersion.NoVersion)
 
       case _ =>
         debug("Conditional update of path %s with data %s and expected version %d failed
due to %s".format(path, data,
@@ -438,12 +418,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def getTopicDeletions: Seq[String] = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
+    getChildrenResponse.resultCode match {
+      case Code.OK => getChildrenResponse.children
+      case Code.NONODE => Seq.empty
+      case _ => throw getChildrenResponse.resultException.get
     }
   }
 
@@ -463,12 +441,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def getPartitionReassignment: Map[TopicPartition, Seq[Int]] = {
     val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      ReassignPartitionsZNode.decode(getDataResponse.data)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      Map.empty[TopicPartition, Seq[Int]]
-    } else {
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case  Code.OK => ReassignPartitionsZNode.decode(getDataResponse.data)
+      case Code.NONODE => Map.empty
+      case _ => throw getDataResponse.resultException.get
     }
   }
 
@@ -512,19 +488,17 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
 
   /**
    * Gets topic partition states for the given partitions.
-   * @param partitions the partitions for which we want ot get states.
+   * @param partitions the partitions for which we want to get states.
    * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able
to lookup the partition state.
    */
   def getTopicPartitionStates(partitions: Seq[TopicPartition]): Map[TopicPartition, LeaderIsrAndControllerEpoch]
= {
     val getDataResponses = getTopicPartitionStatesRaw(partitions)
     getDataResponses.flatMap { getDataResponse =>
       val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
-      if (getDataResponse.resultCode == Code.OK) {
-        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition
-> _)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        None
-      } else {
-        throw getDataResponse.resultException.get
+      getDataResponse.resultCode match {
+        case Code.OK => TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition
-> _)
+        case Code.NONODE => None
+        case _ => throw getDataResponse.resultException.get
       }
     }.toMap
   }
@@ -535,12 +509,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def getAllIsrChangeNotifications: Seq[String] = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      Seq.empty
-    } else {
-      throw getChildrenResponse.resultException.get
+    getChildrenResponse.resultCode match {
+      case Code.OK => getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)
+      case Code.NONODE => Seq.empty
+      case _ => throw getChildrenResponse.resultException.get
     }
   }
 
@@ -555,12 +527,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     }
     val getDataResponses = retryRequestsUntilConnected(getDataRequests)
     getDataResponses.flatMap { getDataResponse =>
-      if (getDataResponse.resultCode == Code.OK) {
-        IsrChangeNotificationSequenceZNode.decode(getDataResponse.data)
-      } else if (getDataResponse.resultCode == Code.NONODE) {
-        None
-      } else {
-        throw getDataResponse.resultException.get
+      getDataResponse.resultCode match {
+        case Code.OK => IsrChangeNotificationSequenceZNode.decode(getDataResponse.data)
+        case Code.NONODE => None
+        case _ => throw getDataResponse.resultException.get
       }
     }
   }
@@ -595,12 +565,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def getPreferredReplicaElection: Set[TopicPartition] = {
     val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      PreferredReplicaElectionZNode.decode(getDataResponse.data)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      Set.empty[TopicPartition]
-    } else {
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case Code.OK => PreferredReplicaElectionZNode.decode(getDataResponse.data)
+      case Code.NONODE => Set.empty
+      case _ => throw getDataResponse.resultException.get
     }
   }
 
@@ -619,12 +587,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def getControllerId: Option[Int] = {
     val getDataRequest = GetDataRequest(ControllerZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      ControllerZNode.decode(getDataResponse.data)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      None
-    } else {
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case Code.OK => ControllerZNode.decode(getDataResponse.data)
+      case Code.NONODE => None
+      case _ => throw getDataResponse.resultException.get
     }
   }
 
@@ -643,13 +609,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def getControllerEpoch: Option[(Int, Stat)] = {
     val getDataRequest = GetDataRequest(ControllerEpochZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      val epoch = ControllerEpochZNode.decode(getDataResponse.data)
-      Option(epoch, getDataResponse.stat)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      None
-    } else {
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case Code.OK =>
+        val epoch = ControllerEpochZNode.decode(getDataResponse.data)
+        Option(epoch, getDataResponse.stat)
+      case Code.NONODE => None
+      case _ => throw getDataResponse.resultException.get
     }
   }
 
@@ -689,12 +654,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def getVersionedAclsForResource(resource: Resource): VersionedAcls = {
     val getDataRequest = GetDataRequest(ResourceZNode.path(resource))
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      ResourceZNode.decode(getDataResponse.data, getDataResponse.stat)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      VersionedAcls(Set(), -1)
-    } else {
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case Code.OK => ResourceZNode.decode(getDataResponse.data, getDataResponse.stat)
+      case Code.NONODE => VersionedAcls(Set(), -1)
+      case _ => throw getDataResponse.resultException.get
     }
   }
 
@@ -722,23 +685,17 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
 
     val setDataResponse = set(aclData, expectedVersion)
     setDataResponse.resultCode match {
-      case Code.OK =>
-        (true, setDataResponse.stat.getVersion)
+      case Code.OK => (true, setDataResponse.stat.getVersion)
       case Code.NONODE => {
         val createResponse = create(aclData)
         createResponse.resultCode match {
-          case Code.OK =>
-            (true, 0)
-          case Code.NODEEXISTS =>
-            (false, 0)
-          case _ =>
-            throw createResponse.resultException.get
+          case Code.OK => (true, 0)
+          case Code.NODEEXISTS => (false, 0)
+          case _ => throw createResponse.resultException.get
         }
       }
-      case Code.BADVERSION =>
-        (false, 0)
-      case _ =>
-        throw setDataResponse.resultException.get
+      case Code.BADVERSION => (false, 0)
+      case _ => throw setDataResponse.resultException.get
     }
   }
 
@@ -750,9 +707,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     val path = AclChangeNotificationSequenceZNode.createPath
     val createRequest = CreateRequest(path, AclChangeNotificationSequenceZNode.encode(resourceName),
acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
-    if (createResponse.resultCode != Code.OK) {
-      throw createResponse.resultException.get
-    }
+    createResponse.resultException.foreach(e => throw e)
   }
 
   /**
@@ -829,12 +784,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def conditionalDelete(resource: Resource, expectedVersion: Int): Boolean = {
     val deleteRequest = DeleteRequest(ResourceZNode.path(resource), expectedVersion)
     val deleteResponse = retryRequestUntilConnected(deleteRequest)
-    if (deleteResponse.resultCode == Code.OK || deleteResponse.resultCode == Code.NONODE)
{
-      true
-    } else if (deleteResponse.resultCode == Code.BADVERSION) {
-      false
-    } else {
-      throw deleteResponse.resultException.get
+    deleteResponse.resultCode match {
+      case Code.OK | Code.NONODE => true
+      case Code.BADVERSION => false
+      case _ => throw deleteResponse.resultException.get
     }
   }
   
@@ -929,12 +882,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   def getConsumerOffset(group: String, topicPartition: TopicPartition): Option[Long] = {
     val getDataRequest = GetDataRequest(ConsumerOffset.path(group, topicPartition.topic,
topicPartition.partition))
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      ConsumerOffset.decode(getDataResponse.data)
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      None
-    } else {
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case Code.OK => ConsumerOffset.decode(getDataResponse.data)
+      case Code.NONODE => None
+      case _ => throw getDataResponse.resultException.get
     }
   }
 
@@ -948,16 +899,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     val setDataResponse = setConsumerOffset(group, topicPartition, offset)
     if (setDataResponse.resultCode == Code.NONODE) {
       val createResponse = createConsumerOffset(group, topicPartition, offset)
-      if (createResponse.resultCode != Code.OK) {
-        throw createResponse.resultException.get
-      }
-    } else if (setDataResponse.resultCode != Code.OK) {
-      throw setDataResponse.resultException.get
+      createResponse.resultException.foreach(e => throw e)
+    } else {
+      setDataResponse.resultException.foreach(e => throw e)
     }
   }
 
   private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long):
SetDataResponse = {
-    val setDataRequest = SetDataRequest(ConsumerOffset.path(group, topicPartition.topic,
topicPartition.partition), ConsumerOffset.encode(offset), ZkVersion.NoVersion)
+    val setDataRequest = SetDataRequest(ConsumerOffset.path(group, topicPartition.topic,
topicPartition.partition),
+      ConsumerOffset.encode(offset), ZkVersion.NoVersion)
     retryRequestUntilConnected(setDataRequest)
   }
 
@@ -977,33 +927,32 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
   /**
    * Deletes the given zk path recursively
    * @param path
-   * @return true if path gets deleted successfully, false if root path doesn't exists
+   * @return true if path gets deleted successfully, false if root path doesn't exist
    * @throws KeeperException if there is an error while deleting the znodes
    */
   private[zk] def deleteRecursive(path: String): Boolean = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
-      val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.NoVersion))
-      if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE)
{
-        throw deleteResponse.resultException.get
-      }
-      true
-    } else if (getChildrenResponse.resultCode == Code.NONODE) {
-      false
-    } else
-      throw getChildrenResponse.resultException.get
+    getChildrenResponse.resultCode match {
+      case Code.OK =>
+        getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
+        val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.NoVersion))
+        if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode !=
Code.NONODE) {
+          throw deleteResponse.resultException.get
+        }
+        true
+      case Code.NONODE => false
+      case _ => throw getChildrenResponse.resultException.get
+    }
   }
 
   private[zk] def pathExists(path: String): Boolean = {
     val getDataRequest = GetDataRequest(path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
-    if (getDataResponse.resultCode == Code.OK) {
-      true
-    } else if (getDataResponse.resultCode == Code.NONODE) {
-      false
-    } else
-      throw getDataResponse.resultException.get
+    getDataResponse.resultCode match {
+      case Code.OK => true
+      case Code.NONODE => false
+      case _ => throw getDataResponse.resultException.get
+    }
   }
 
   private[zk] def createRecursive(path: String): Unit = {
@@ -1099,13 +1048,12 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
       val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL)
       val createResponse = retryRequestUntilConnected(createRequest)
       val code = createResponse.resultCode
-      if (code == Code.OK) {
-        code
-      } else if (code == Code.NODEEXISTS) {
-        get()
-      } else {
-        error(s"Error while creating ephemeral at $path with return code: $code")
-        code
+      code match {
+        case Code.OK => code
+        case Code.NODEEXISTS => get()
+        case _ =>
+          error(s"Error while creating ephemeral at $path with return code: $code")
+          code
       }
     }
 
@@ -1113,19 +1061,20 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
       val getDataRequest = GetDataRequest(path)
       val getDataResponse = retryRequestUntilConnected(getDataRequest)
       val code = getDataResponse.resultCode
-      if (code == Code.OK) {
-        if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) {
+      code match {
+        case Code.OK =>
+          if (getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId) {
+            error(s"Error while creating ephemeral at $path with return code: $code")
+            Code.NODEEXISTS
+          } else {
+            code
+          }
+        case Code.NONODE =>
+          info(s"The ephemeral node at $path went away while reading it")
+          create()
+        case _ =>
           error(s"Error while creating ephemeral at $path with return code: $code")
-          Code.NODEEXISTS
-        } else {
           code
-        }
-      } else if (code == Code.NONODE) {
-        info(s"The ephemeral node at $path went away while reading it")
-        create()
-      } else {
-        error(s"Error while creating ephemeral at $path with return code: $code")
-        code
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f300480f/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 400f0c7..4c618a0 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -255,6 +255,10 @@ object ZkVersion {
   val NoVersion = -1
 }
 
+object ZkStat {
+  val NoStat = new Stat()
+}
+
 object StateChangeHandlers {
   val ControllerHandler = "controller-state-change-handler"
   def zkNodeChangeListenerHandler(seqNodeRoot: String) = s"change-notification-$seqNodeRoot"


Mime
View raw message