kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-6073; Use ZookeeperClient in KafkaApis
Date Mon, 30 Oct 2017 16:46:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f4e9c84c5 -> 9504af72f


KAFKA-6073; Use ZookeeperClient in KafkaApis

I kept zkUtils for the call to AdminUtils.createTopic(). AdminUtils can be done in another
PR.

Is there a reason why we use TopicAndPartition instead of TopicPartition in KafkaControllerZkUtils
?

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Jun Rao <junrao@gmail.com>

Closes #4111 from mimaison/KAFKA-6073


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9504af72
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9504af72
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9504af72

Branch: refs/heads/trunk
Commit: 9504af72ff915f5e82833fd96b57dc00e5825925
Parents: f4e9c84
Author: Mickael Maison <mickael.maison@gmail.com>
Authored: Mon Oct 30 09:46:11 2017 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Oct 30 09:46:11 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala |  10 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../src/main/scala/kafka/zk/KafkaZkClient.scala | 101 +++++++++++++++++--
 core/src/main/scala/kafka/zk/ZkData.scala       |  10 ++
 .../scala/unit/kafka/server/KafkaApisTest.scala |   3 +
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala |  92 +++++++++++++++++
 6 files changed, 201 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9504af72/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9db1a26..53feeac 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,7 +29,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.controller.KafkaController
+import kafka.controller.{KafkaController}
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
 import kafka.log.{Log, LogManager, TimestampOffset}
@@ -38,6 +38,7 @@ import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction,
SendActi
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
 import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME,
isInternal}
@@ -71,6 +72,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val txnCoordinator: TransactionCoordinator,
                 val controller: KafkaController,
                 val zkUtils: ZkUtils,
+                val zkClient: KafkaZkClient,
                 val brokerId: Int,
                 val config: KafkaConfig,
                 val metadataCache: MetadataCache,
@@ -300,12 +302,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         // for version 0 always store offsets to ZK
         val responseInfo = authorizedTopicRequestInfo.map {
           case (topicPartition, partitionData) =>
-            val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
             try {
               if (partitionData.metadata != null && partitionData.metadata.length
> config.offsetMetadataMaxSize)
                 (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
               else {
-                zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}",
partitionData.offset.toString)
+                zkClient.setOrCreateConsumerOffset(offsetCommitRequest.groupId, topicPartition,
partitionData.offset)
                 (topicPartition, Errors.NONE)
               }
             } catch {
@@ -1006,12 +1007,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
             // version 0 reads offsets from ZK
             val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
-              val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
               try {
                 if (!metadataCache.contains(topicPartition.topic))
                   (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
                 else {
-                  val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+                  val payloadOpt = zkClient.getConsumerOffset(offsetFetchRequest.groupId,
topicPartition)
                   payloadOpt match {
                     case Some(payload) =>
                       (topicPartition, new OffsetFetchResponse.PartitionData(

http://git-wip-us.apache.org/repos/asf/kafka/blob/9504af72/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d576206..e870ce4 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -276,7 +276,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
         /* start processing requests */
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator,
transactionCoordinator,
-          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer,
quotaManagers,
+          kafkaController, zkUtils, zkClient, config.brokerId, config, metadataCache, metrics,
authorizer, quotaManagers,
           brokerTopicStats, clusterId, time)
 
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel,
apis, time,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9504af72/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 0e48d51..925a6f6 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -32,6 +32,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
+import org.apache.kafka.common.TopicPartition
 
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -226,7 +227,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * @return SetDataResponse
    */
   def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]):
SetDataResponse = {
-    val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment),
-1)
+    val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment),
ZkVersion.NoVersion)
     retryRequestUntilConnected(setDataRequest)
   }
 
@@ -284,7 +285,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
     val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1)
+      DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), ZkVersion.NoVersion)
     }
     retryRequestsUntilConnected(deleteRequests)
   }
@@ -329,7 +330,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * @param topics the topics to remove.
    */
   def deleteTopicDeletions(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic),
-1))
+    val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic),
ZkVersion.NoVersion))
     retryRequestsUntilConnected(deleteRequests)
   }
 
@@ -355,7 +356,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * @return SetDataResponse
    */
   def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse
= {
-    val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
-1)
+    val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
ZkVersion.NoVersion)
     retryRequestUntilConnected(setDataRequest)
   }
 
@@ -374,7 +375,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * Deletes the partition reassignment znode.
    */
   def deletePartitionReassignment(): Unit = {
-    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1)
+    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, ZkVersion.NoVersion)
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -451,7 +452,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    */
   def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
     val deleteRequests = sequenceNumbers.map { sequenceNumber =>
-      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1)
+      DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), ZkVersion.NoVersion)
     }
     retryRequestsUntilConnected(deleteRequests)
   }
@@ -476,7 +477,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * Deletes the preferred replica election znode.
    */
   def deletePreferredReplicaElection(): Unit = {
-    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1)
+    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, ZkVersion.NoVersion)
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -500,7 +501,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * Deletes the controller znode.
    */
   def deleteController(): Unit = {
-    val deleteRequest = DeleteRequest(ControllerZNode.path, -1)
+    val deleteRequest = DeleteRequest(ControllerZNode.path, ZkVersion.NoVersion)
     retryRequestUntilConnected(deleteRequest)
   }
 
@@ -534,7 +535,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
    * @param topics the topics whose configs we wish to delete.
    */
   def deleteTopicConfigs(topics: Seq[String]): Unit = {
-    val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic,
topic), -1))
+    val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic,
topic), ZkVersion.NoVersion))
     retryRequestsUntilConnected(deleteRequests)
   }
 
@@ -591,11 +592,65 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
     zooKeeperClient.close()
   }
 
-  private def deleteRecursive(path: String): Unit = {
+  /**
+   * Get the committed offset for a topic partition and group
+   * @param group the group we wish to get offset for
+   * @param topicPartition the topic partition we wish to get the offset for
+   * @return optional long that is Some if there was an offset committed for topic partition,
group and None otherwise.
+   */
+  def getConsumerOffset(group: String, topicPartition: TopicPartition): Option[Long] = {
+    val getDataRequest = GetDataRequest(ConsumerOffset.path(group, topicPartition.topic,
topicPartition.partition))
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    if (getDataResponse.resultCode == Code.OK) {
+      ConsumerOffset.decode(getDataResponse.data)
+    } else if (getDataResponse.resultCode == Code.NONODE) {
+      None
+    } else {
+      throw getDataResponse.resultException.get
+    }
+  }
+
+   /**
+   * Set the committed offset for a topic partition and group
+   * @param group the group whose offset is being set
+   * @param topicPartition the topic partition whose offset is being set
+   * @param offset the offset value
+   */
+  def setOrCreateConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long):
Unit = {
+    val setDataResponse = setConsumerOffset(group, topicPartition, offset)
+    if (setDataResponse.resultCode == Code.NONODE) {
+      val createResponse = createConsumerOffset(group, topicPartition, offset)
+      if (createResponse.resultCode != Code.OK) {
+        throw createResponse.resultException.get
+      }
+    } else if (setDataResponse.resultCode != Code.OK) {
+      throw setDataResponse.resultException.get
+    }
+  }
+
+  private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long):
SetDataResponse = {
+    val setDataRequest = SetDataRequest(ConsumerOffset.path(group, topicPartition.topic,
topicPartition.partition), ConsumerOffset.encode(offset), ZkVersion.NoVersion)
+    retryRequestUntilConnected(setDataRequest)
+  }
+
+  private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset:
Long): CreateResponse = {
+    val path = ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)
+    val createRequest = CreateRequest(path, ConsumerOffset.encode(offset), acls(path), CreateMode.PERSISTENT)
+    var createResponse = retryRequestUntilConnected(createRequest)
+    if (createResponse.resultCode == Code.NONODE) {
+      val indexOfLastSlash = path.lastIndexOf("/")
+      if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
+      createRecursive(path.substring(0, indexOfLastSlash))
+      createResponse = retryRequestUntilConnected(createRequest)
+    }
+    createResponse
+  }
+
+  private[zk] def deleteRecursive(path: String): Unit = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
     if (getChildrenResponse.resultCode == Code.OK) {
       getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
-      val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, -1))
+      val deleteResponse = retryRequestUntilConnected(DeleteRequest(path, ZkVersion.NoVersion))
       if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE)
{
         throw deleteResponse.resultException.get
       }
@@ -603,6 +658,30 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean)
extends
       throw getChildrenResponse.resultException.get
     }
   }
+
+  private[zk] def pathExists(path: String): Boolean = {
+    val getDataRequest = GetDataRequest(path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    getDataResponse.resultCode == Code.OK
+  }
+
+  private[zk] def createRecursive(path: String): Unit = {
+    val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
+    var createResponse = retryRequestUntilConnected(createRequest)
+    if (createResponse.resultCode == Code.NONODE) {
+      val indexOfLastSlash = path.lastIndexOf("/")
+      if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
+      val parentPath = path.substring(0, indexOfLastSlash)
+      createRecursive(parentPath)
+      createResponse = retryRequestUntilConnected(createRequest)
+      if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS)
{
+        throw createResponse.resultException.get
+      }
+    } else if (createResponse.resultCode != Code.OK && createResponse.resultCode
!= Code.NODEEXISTS) {
+      throw createResponse.resultException.get
+    }
+  }
+
   private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
     val createRequests = partitions.map { partition =>
       val path = TopicPartitionZNode.path(partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9504af72/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 292523c..4698455 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -242,3 +242,13 @@ object PreferredReplicaElectionZNode {
     }
   }.map(_.toSet).getOrElse(Set.empty)
 }
+
+object ConsumerOffset {
+  def path(group: String, topic: String, partition: Integer) = s"/consumers/${group}/offset/${topic}/${partition}"
+  def encode(offset: Long): Array[Byte] = offset.toString.getBytes(UTF_8)
+  def decode(bytes: Array[Byte]): Option[Long] = Option(bytes).map(new String(_, UTF_8).toLong)
+}
+
+object ZkVersion {
+  val NoVersion = -1
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9504af72/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 60f403d..c3b9ceb 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -31,6 +31,7 @@ import kafka.network.RequestChannel
 import kafka.security.auth.Authorizer
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.utils.{MockTime, TestUtils, ZkUtils}
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnsupportedVersionException
 import org.apache.kafka.common.memory.MemoryPool
@@ -60,6 +61,7 @@ class KafkaApisTest {
   private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
   private val controller = EasyMock.createNiceMock(classOf[KafkaController])
   private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+  private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
   private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
   private val metrics = new Metrics()
   private val brokerId = 1
@@ -83,6 +85,7 @@ class KafkaApisTest {
       txnCoordinator,
       controller,
       zkUtils,
+      zkClient,
       brokerId,
       new KafkaConfig(properties),
       metadataCache,

http://git-wip-us.apache.org/repos/asf/kafka/blob/9504af72/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
new file mode 100644
index 0000000..00c0a02
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import kafka.zookeeper.ZooKeeperClient
+
+import org.junit.{After, Before, Test}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.apache.kafka.common.TopicPartition
+
+class KafkaZkClientTest extends ZooKeeperTestHarness {
+
+  private var zooKeeperClient: ZooKeeperClient = null
+  private var zkClient: KafkaZkClient = null
+
+  private val group = "my-group"
+  private val topicPartition = new TopicPartition("topic", 0)
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
null)
+    zkClient = new KafkaZkClient(zooKeeperClient, false)
+  }
+
+  @After
+  override def tearDown() {
+    zkClient.close()
+    super.tearDown()
+  }
+
+  @Test
+  def testSetAndGetConsumerOffset() {
+    val offset = 123L
+    // None if no committed offsets
+    assertTrue(zkClient.getConsumerOffset(group, topicPartition).isEmpty)
+    // Set and retrieve an offset
+    zkClient.setOrCreateConsumerOffset(group, topicPartition, offset)
+    assertEquals(offset, zkClient.getConsumerOffset(group, topicPartition).get)
+    // Update an existing offset and retrieve it
+    zkClient.setOrCreateConsumerOffset(group, topicPartition, offset + 2L)
+    assertEquals(offset + 2L, zkClient.getConsumerOffset(group, topicPartition).get)
+  }
+
+  @Test
+  def testGetConsumerOffsetNoData() {
+    zkClient.createRecursive(ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition))
+    assertTrue(zkClient.getConsumerOffset(group, topicPartition).isEmpty)
+  }
+
+  @Test
+  def testDeleteRecursive() {
+    zkClient.deleteRecursive("/delete/does-not-exist")
+
+    zkClient.createRecursive("/delete/some/random/path")
+    assertTrue(zkClient.pathExists("/delete/some/random/path"))
+    zkClient.deleteRecursive("/delete")
+    assertFalse(zkClient.pathExists("/delete/some/random/path"))
+    assertFalse(zkClient.pathExists("/delete/some/random"))
+    assertFalse(zkClient.pathExists("/delete/some"))
+    assertFalse(zkClient.pathExists("/delete"))
+
+    intercept[IllegalArgumentException](zkClient.deleteRecursive("delete-invalid-path"))
+  }
+
+  @Test
+  def testCreateRecursive() {
+    zkClient.createRecursive("/create-newrootpath")
+    assertTrue(zkClient.pathExists("/create-newrootpath"))
+
+    zkClient.createRecursive("/create/some/random/long/path")
+    assertTrue(zkClient.pathExists("/create/some/random/long/path"))
+    zkClient.createRecursive("/create/some/random/long/path") // no errors if path already
exists
+
+    intercept[IllegalArgumentException](zkClient.createRecursive("create-invalid-path"))
+  }
+
+}
\ No newline at end of file


Mime
View raw message