kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/5] kafka git commit: KAFKA-2639: Refactoring of ZkUtils
Date Sun, 18 Oct 2015 22:24:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 78a2e2f8f -> ce306ba4e


http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index e4f3576..8e72ad3 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -105,7 +105,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     verifyUncleanLeaderElectionEnabled
   }
@@ -118,7 +118,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
 
     verifyUncleanLeaderElectionDisabled
   }
@@ -133,7 +133,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", String.valueOf(true))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
       topicProps)
 
     verifyUncleanLeaderElectionEnabled
@@ -149,7 +149,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled
     val topicProps = new Properties()
     topicProps.put("unclean.leader.election.enable", String.valueOf(false))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
       topicProps)
 
     verifyUncleanLeaderElectionDisabled
@@ -164,13 +164,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     topicProps.put("unclean.leader.election.enable", "invalid")
 
     intercept[ConfigException] {
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
     }
   }
 
   def verifyUncleanLeaderElectionEnabled {
     // wait until leader is elected
-    val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+    val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     assertTrue("Leader should get elected", leaderIdOpt.isDefined)
     val leaderId = leaderIdOpt.get
     debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
@@ -195,7 +195,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
 
     // wait until new leader is (uncleanly) elected
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId))
 
     sendMessage(servers, topic, "third")
 
@@ -205,7 +205,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   def verifyUncleanLeaderElectionDisabled {
     // wait until leader is elected
-    val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+    val leaderIdOpt = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     assertTrue("Leader should get elected", leaderIdOpt.isDefined)
     val leaderId = leaderIdOpt.get
     debug("Leader for " + topic  + " is elected to be: %s".format(leaderId))
@@ -230,7 +230,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     servers.filter(server => server.config.brokerId == followerId).map(server => server.startup())
 
     // verify that unclean election to non-ISR follower does not occur
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(-1))
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(-1))
 
     // message production and consumption should both fail while leader is down
     intercept[FailedToSendMessageException] {
@@ -240,14 +240,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
     // restart leader temporarily to send a successfully replicated message
     servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup())
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(leaderId))
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId))
 
     sendMessage(servers, topic, "third")
     waitUntilMetadataIsPropagated(servers, topic, partitionId)
     servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server))
 
     // verify clean leader transition to ISR follower
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, newLeaderOpt = Some(followerId))
+    waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId))
 
     // verify messages can be consumed from ISR follower that was just promoted to leader
     assertEquals(List("first", "second", "third"), consumeAllMessages(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index f4e0127..88d95e8 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -57,7 +57,7 @@ class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeep
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // create the topic
-    TestUtils.createTopic(zkClient, topic, numParts, 1, servers)
+    TestUtils.createTopic(zkUtils, topic, numParts, 1, servers)
 
     // send some messages to each broker
     val sentMessages1 = sendMessages(servers, nMessages, "batch1")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 6ceb17b..d699386 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -54,7 +54,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testMetricsLeak() {
     // create topic topic1 with 1 partition on broker 0
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
     // force creation not client's specific metrics.
     createAndShutdownStep("group0", "consumer0", "producer0")
 
@@ -69,9 +69,9 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @Test
   def testMetricsReporterAfterDeletingTopic() {
     val topic = "test-topic-metric"
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
-    AdminUtils.deleteTopic(zkClient, topic)
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index dd96d29..18a0cd5 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -100,7 +100,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
   @Test
   def testUpdateBrokerPartitionInfo() {
     val topic = "new-topic"
-    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
+    TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
 
     val props = new Properties()
     // no need to retry since the send will always fail
@@ -155,7 +155,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
     val topic = "new-topic"
     // create topic with 1 partition and await leadership
-    TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
+    TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 2, servers = servers)
 
     val producer1 = TestUtils.createProducer[String, String](
       brokerList = TestUtils.getBrokerListStrFromServers(Seq(server1, server2)),
@@ -168,7 +168,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
     producer1.send(new KeyedMessage[String, String](topic, "test", "test2"))
     // get the leader
-    val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val leaderOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
     val leader = leaderOpt.get
 
@@ -217,7 +217,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
     val topic = "new-topic"
     // create topic
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)),
+    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0), 1->Seq(0), 2->Seq(0), 3->Seq(0)),
                           servers = servers)
 
     val producer = TestUtils.createProducer[String, String](
@@ -250,7 +250,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
     // restart server 1
     server1.startup()
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
 
     try {
@@ -283,7 +283,7 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
     val topic = "new-topic"
     // create topics in ZK
-    TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+    TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
 
     // do a simple test to make sure plumbing is okay
     try {
@@ -330,12 +330,12 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
     try {
 
       // create topic
-      AdminUtils.createTopic(zkClient, "new-topic", 2, 1)
+      AdminUtils.createTopic(zkUtils, "new-topic", 2, 1)
       TestUtils.waitUntilTrue(() =>
-        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+        AdminUtils.fetchTopicMetadataFromZk("new-topic", zkUtils).errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
         "Topic new-topic not created after timeout",
         waitTime = zookeeper.tickTime)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "new-topic", 0)
 
       producer.send(new KeyedMessage[String, String]("new-topic", "key", null))
     } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 90689f6..8e15ef8 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -97,7 +97,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    TestUtils.createTopic(zkClient, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
+    TestUtils.createTopic(zkUtils, "test", numPartitions = 1, replicationFactor = 1, servers = servers)
 
     val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))
     val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
@@ -126,8 +126,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
     props.put("request.required.acks", "0")
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    AdminUtils.createTopic(zkClient, "test", 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0)
+    AdminUtils.createTopic(zkUtils, "test", 1, 1)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0)
 
     // This message will be dropped silently since message size too large.
     producer.send(TestUtils.produceRequest("test", 0,
@@ -167,10 +167,10 @@ class SyncProducerTest extends KafkaServerTestHarness {
     }
 
     // #2 - test that we get correct offsets when partition is owned by broker
-    AdminUtils.createTopic(zkClient, "topic1", 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0)
-    AdminUtils.createTopic(zkClient, "topic3", 1, 1)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0)
+    AdminUtils.createTopic(zkUtils, "topic1", 1, 1)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic1", 0)
+    AdminUtils.createTopic(zkUtils, "topic3", 1, 1)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic3", 0)
 
     val response2 = producer.send(request)
     Assert.assertNotNull(response2)
@@ -244,8 +244,8 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas","2")
-    AdminUtils.createTopic(zkClient, topicName, 1, 1,topicProps)
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicName, 0)
+    AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0)
 
     val response = producer.send(TestUtils.produceRequest(topicName, 0,
       new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 655bc20..85252d0 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -219,12 +219,12 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     //test remove all acls for resource
     simpleAclAuthorizer.removeAcls(resource)
     TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
-    assertTrue(!ZkUtils.pathExists(zkClient, simpleAclAuthorizer.toResourcePath(resource)))
+    assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
 
     //test removing last acl also deletes zookeeper path
     acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
     changeAclAndVerify(acls, Set.empty[Acl], acls)
-    assertTrue(!ZkUtils.pathExists(zkClient, simpleAclAuthorizer.toResourcePath(resource)))
+    assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource)))
   }
 
   @Test
@@ -240,7 +240,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     val acls1 = Set[Acl](acl2)
     simpleAclAuthorizer.addAcls(acls1, resource1)
 
-    ZkUtils.deletePathRecursive(zkClient, SimpleAclAuthorizer.AclChangedZkPath)
+    zkUtils.deletePathRecursive(SimpleAclAuthorizer.AclChangedZkPath)
     val authorizer = new SimpleAclAuthorizer
     authorizer.configure(config.originals)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 7b55f79..75fa664 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -49,7 +49,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
 
   @Test
   def testBrokerAdvertiseToZK {
-    val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId)
+    val brokerInfo = zkUtils.getBrokerInfo(brokerId)
     val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get
     assertEquals(advertisedHostName, endpoint.host)
     assertEquals(advertisedPort, endpoint.port)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index b744b94..ade110d 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -57,7 +57,7 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
 
     // create a topic and partition and await leadership
     for (topic <- List(topic1,topic2)) {
-      createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
+      createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 2, servers = brokers)
     }
 
     // send test messages to leader

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 16ac40d..6061e66 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -39,14 +39,14 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val tp = TopicAndPartition("test", 0)
     val logProps = new Properties()
     logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
-    AdminUtils.createTopic(zkClient, tp.topic, 1, 1, logProps)
+    AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
       val logOpt = this.servers(0).logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
       assertEquals(oldVal, logOpt.get.config.flushInterval)
     }
     logProps.put(LogConfig.FlushMessagesProp, newVal.toString)
-    AdminUtils.changeTopicConfig(zkClient, tp.topic, logProps)
+    AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
     TestUtils.retry(10000) {
       assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval)
     }
@@ -61,7 +61,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     val props = new Properties()
     props.put("a.b", "c")
     props.put("x.y", "z")
-    AdminUtils.changeClientIdConfig(zkClient, clientId, props)
+    AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
     TestUtils.retry(10000) {
       val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
       assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId))
@@ -77,7 +77,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     try {
       val logProps = new Properties()
       logProps.put(LogConfig.FlushMessagesProp, 10000: java.lang.Integer)
-      AdminUtils.changeTopicConfig(zkClient, topic, logProps)
+      AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
       fail("Should fail with AdminOperationException for topic doesn't exist")
     } catch {
       case e: AdminOperationException => // expected
@@ -99,7 +99,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     EasyMock.expectLastCall().once()
     EasyMock.replay(handler)
 
-    val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
+    val configManager = new DynamicConfigManager(zkUtils, Map(ConfigType.Topic -> handler))
     // Notifications created using the old TopicConfigManager are ignored.
     configManager.processNotification(Some("not json"))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index c288e56..2e66601 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -25,7 +25,7 @@ import org.junit._
 import org.junit.Assert._
 import kafka.common._
 import kafka.cluster.Replica
-import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
+import kafka.utils.{ZkUtils, SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
 import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 
@@ -48,15 +48,15 @@ class HighwatermarkPersistenceTest {
   @Test
   def testHighWatermarkPersistenceSinglePartition() {
     // mock zkclient
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    EasyMock.replay(zkClient)
+    val zkUtils = EasyMock.createMock(classOf[ZkUtils])
+    EasyMock.replay(zkUtils)
     
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     val metrics = new Metrics
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkClient, scheduler,
+    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime, new JMockTime, zkUtils, scheduler,
       logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     try {
@@ -78,7 +78,7 @@ class HighwatermarkPersistenceTest {
       replicaManager.checkpointHighWatermarks()
       fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
-      EasyMock.verify(zkClient)
+      EasyMock.verify(zkUtils)
     } finally {
       // shutdown the replica manager upon test completion
       replicaManager.shutdown(false)
@@ -92,14 +92,14 @@ class HighwatermarkPersistenceTest {
     val topic1 = "foo1"
     val topic2 = "foo2"
     // mock zkclient
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    EasyMock.replay(zkClient)
+    val zkUtils = EasyMock.createMock(classOf[ZkUtils])
+    EasyMock.replay(zkUtils)
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     val metrics = new Metrics
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkClient,
+    val replicaManager = new ReplicaManager(configs.head, metrics, new MockTime(), new JMockTime, zkUtils,
       scheduler, logManagers(0), new AtomicBoolean(false))
     replicaManager.startup()
     try {
@@ -144,7 +144,7 @@ class HighwatermarkPersistenceTest {
       // verify checkpointed hw for topic 1
       topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(10L, topic1Partition0Hw)
-      EasyMock.verify(zkClient)
+      EasyMock.verify(zkUtils)
     } finally {
       // shutdown the replica manager upon test completion
       replicaManager.shutdown(false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 4637972..42c1199 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -407,7 +407,7 @@ class KafkaConfigTest {
         case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
-
+          
         case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.HostNameProp => // ignore string
         case KafkaConfig.AdvertisedHostNameProp => //ignore string

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 0efaa6a..1d741f2 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -70,9 +70,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val partitionId = 0
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
+    val leader1 = createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
 
-    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("leader Epoc: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     assertTrue("Leader should get elected", leader1.isDefined)
@@ -83,9 +83,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     // kill the server hosting the preferred replica
     servers.last.shutdown()
     // check if leader moves to the other server
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
+    val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId,
                                                     oldLeaderOpt = if(leader1.get == 0) None else leader1)
-    val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     debug("leader Epoc: " + leaderEpoch2)
     assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
@@ -97,9 +97,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     servers.last.startup()
     servers.head.shutdown()
     Thread.sleep(zookeeper.tickTime)
-    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId,
+    val leader3 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId,
                                                     oldLeaderOpt = if(leader2.get == 1) None else leader2)
-    val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("leader Epoc: " + leaderEpoch3)
     debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
     assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
@@ -116,9 +116,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val partitionId = 0
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    val leader1 = createTopic(zkClient, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
+    val leader1 = createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0, 1)), servers = servers)(0)
 
-    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("leader Epoc: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     assertTrue("Leader should get elected", leader1.isDefined)
@@ -136,7 +136,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       new LeaderAndIsrRequest.EndPoint(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
     }
 
-    val controllerContext = new ControllerContext(zkClient, zkConnection, 6000)
+    val controllerContext = new ControllerContext(zkUtils, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val metrics = new Metrics
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 344001d..babf6fb 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -77,7 +77,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkClient, topic, 1, 1)
+    AdminUtils.createTopic(zkUtils, topic, 1, 1)
 
     val logManager = server.getLogManager
     waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined,
@@ -117,7 +117,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val topic = topicPartition.split("-").head
 
     // setup brokers in zookeeper as owners of partitions for this test
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
+    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     var offsetChanged = false
     for(i <- 1 to 14) {
@@ -141,7 +141,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkClient, topic, 3, 1)
+    AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
@@ -170,7 +170,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    AdminUtils.createTopic(zkClient, topic, 3, 1)
+    AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 46829b8..7a434aa 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -81,7 +81,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     servers = List(server1, server2)
 
     // create topic with 1 partition, 2 replicas, one on each broker
-    createTopic(zkClient, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
+    createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers)
 
     // create the producer
     updateProducer()
@@ -116,7 +116,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
   @Test
   def testHWCheckpointWithFailuresSingleLogSegment {
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
 
     assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
@@ -129,7 +129,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = leader)
+    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     // bring the preferred replica back
@@ -137,7 +137,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     // Update producer with new server settings
     updateProducer()
 
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
     assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
       leader.isDefined && (leader.get == 0 || leader.get == 1))
 
@@ -148,7 +148,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     server2.startup()
     updateProducer()
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = leader)
+    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader)
     assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1",
       leader.isDefined && (leader.get == 0 || leader.get == 1))
 
@@ -183,7 +183,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
   @Test
   def testHWCheckpointWithFailuresMultipleLogSegments {
-    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
 
     sendMessages(2)
     var hw = 2L
@@ -201,7 +201,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     server2.startup()
     updateProducer()
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = leader)
+    leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index f846698..7440500 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -82,7 +82,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     val topicAndPartition = TopicAndPartition(topic, 0)
     val expectedReplicaAssignment = Map(0  -> List(1))
     // create the topic
-    createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server))
+    createTopic(zkUtils, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server))
 
     val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset = 42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
@@ -131,10 +131,10 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     val topic4 = "topic-4" // Topic that group never consumes
     val topic5 = "topic-5" // Non-existent topic
 
-    createTopic(zkClient, topic1, servers = Seq(server), numPartitions = 1)
-    createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 2)
-    createTopic(zkClient, topic3, servers = Seq(server), numPartitions = 1)
-    createTopic(zkClient, topic4, servers = Seq(server), numPartitions = 1)
+    createTopic(zkUtils, topic1, servers = Seq(server), numPartitions = 1)
+    createTopic(zkUtils, topic2, servers = Seq(server), numPartitions = 2)
+    createTopic(zkUtils, topic3, servers = Seq(server), numPartitions = 1)
+    createTopic(zkUtils, topic4, servers = Seq(server), numPartitions = 1)
 
     val commitRequest = OffsetCommitRequest("test-group", immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"),
@@ -197,7 +197,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
   def testLargeMetadataPayload() {
     val topicAndPartition = TopicAndPartition("large-metadata", 0)
     val expectedReplicaAssignment = Map(0  -> List(1))
-    createTopic(zkClient, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment,
+    createTopic(zkUtils, topicAndPartition.topic, partitionReplicaAssignment = expectedReplicaAssignment,
                 servers = Seq(server))
 
     val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(
@@ -222,7 +222,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     // set up topic partition
     val topic = "topic"
     val topicPartition = TopicAndPartition(topic, 0)
-    createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1)
+    createTopic(zkUtils, topic, servers = Seq(server), numPartitions = 1)
 
     val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)))
 
@@ -293,7 +293,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     val topic1 = "topicDoesNotExists"
     val topic2 = "topic-2"
 
-    createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1)
+    createTopic(zkUtils, topic2, servers = Seq(server), numPartitions = 1)
 
     // Commit an offset
     val expectedReplicaAssignment = Map(0  -> List(1))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1813349..724d4ac 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import kafka.api.{ProducerResponseStatus, SerializationTestUtils, ProducerRequest}
 import kafka.common.TopicAndPartition
-import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import kafka.utils.{ZkUtils, MockScheduler, MockTime, TestUtils}
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
@@ -42,11 +42,12 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val zkUtils = ZkUtils(zkClient, false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val jTime = new JMockTime
     val metrics = new Metrics
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
       val partition = rm.getOrCreatePartition(topic, 1)
@@ -65,11 +66,12 @@ class ReplicaManagerTest {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val zkUtils = ZkUtils(zkClient, false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val jTime = new JMockTime
     val metrics = new Metrics
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
       val partition = rm.getOrCreatePartition(topic, 1)
@@ -87,11 +89,12 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val zkUtils = ZkUtils(zkClient, false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val jTime = new JMockTime
     val metrics = new Metrics
-    val rm = new ReplicaManager(config, metrics, time, jTime, zkClient, new MockScheduler(time), mockLogMgr,
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), Option(this.getClass.getName))
     try {
       val produceRequest = new ProducerRequest(1, "client 1", 3, 1000, SerializationTestUtils.topicDataProducerRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index b6d5697..8f081b9 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -53,7 +53,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
       keyEncoder = classOf[IntEncoder].getName)
 
     // create topic
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
+    createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server))
 
     // send some messages
     producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 0adc0aa..145f00a 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -36,7 +36,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
     val server = TestUtils.createServer(KafkaConfig.fromProps(props))
 
-    val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
+    val pathExists = zkUtils.pathExists(zookeeperChroot)
     assertTrue(pathExists)
 
     server.shutdown()
@@ -51,7 +51,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     val brokerId = 0
     val props1 = TestUtils.createBrokerConfig(brokerId, zkConnect)
     val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
-    val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
+    val brokerRegistration = zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1
 
     val props2 = TestUtils.createBrokerConfig(brokerId, zkConnect)
     try {
@@ -63,7 +63,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     }
 
     // broker registration shouldn't change
-    assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+    assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
 
     server1.shutdown()
     CoreUtils.rm(server1.config.logDirs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 0485f7b..689b70b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -69,8 +69,8 @@ class SimpleFetchTest {
   @Before
   def setUp() {
     // create nice mock since we don't particularly care about zkclient calls
-    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
-    EasyMock.replay(zkClient)
+    val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+    EasyMock.replay(zkUtils)
 
     // create nice mock since we don't particularly care about scheduler calls
     val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
@@ -98,7 +98,7 @@ class SimpleFetchTest {
     EasyMock.replay(logManager)
 
     // create the replica manager
-    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkClient, scheduler, logManager,
+    replicaManager = new ReplicaManager(configs.head, metrics, time, jTime, zkUtils, scheduler, logManager,
       new AtomicBoolean(false))
 
     // add the partition with two replicas, both in ISR

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index b3835f0..d4ddb2f 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -47,7 +47,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    ZkUtils.createPersistentPath(zkClient,topicPath,topicData)
+    zkUtils.createPersistentPath(topicPath, topicData)
   }
 
   @Test
@@ -66,23 +66,23 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
-    EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+    EasyMock.expect(replicaManager.zkUtils).andReturn(zkUtils)
     EasyMock.replay(replicaManager)
 
-    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
+    zkUtils.makeSurePersistentPathExists(ZkUtils.IsrChangeNotificationPath)
 
     val replicas = List(0,1)
 
     // regular update
     val newLeaderAndIsr1 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, 0)
-    val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+    val (updateSucceeded1,newZkVersion1) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
       "my-topic-test", partitionId, newLeaderAndIsr1, controllerEpoch, 0)
     assertTrue(updateSucceeded1)
     assertEquals(newZkVersion1, 1)
 
     // mismatched zkVersion with the same data
     val newLeaderAndIsr2 = new LeaderAndIsr(brokerId, leaderEpoch, replicas, zkVersion + 1)
-    val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+    val (updateSucceeded2,newZkVersion2) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
       "my-topic-test", partitionId, newLeaderAndIsr2, controllerEpoch, zkVersion + 1)
     assertTrue(updateSucceeded2)
     // returns true with existing zkVersion
@@ -90,7 +90,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
 
     // mismatched zkVersion and leaderEpoch
     val newLeaderAndIsr3 = new LeaderAndIsr(brokerId, leaderEpoch + 1, replicas, zkVersion + 1)
-    val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkClient,
+    val (updateSucceeded3,newZkVersion3) = ReplicationUtils.updateLeaderAndIsr(zkUtils,
       "my-topic-test", partitionId, newLeaderAndIsr3, controllerEpoch, zkVersion + 1)
     assertFalse(updateSucceeded3)
     assertEquals(newZkVersion3,-1)
@@ -98,9 +98,9 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
 
   @Test
   def testGetLeaderIsrAndEpochForPartition() {
-    val leaderIsrAndControllerEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId)
+    val leaderIsrAndControllerEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partitionId)
     assertEquals(topicDataLeaderIsrAndControllerEpoch, leaderIsrAndControllerEpoch.get)
-    assertEquals(None, ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId + 1))
+    assertEquals(None, ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partitionId + 1))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4a53e11..1a0a7dc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -45,6 +45,7 @@ import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
 import kafka.producer.ProducerConfig
 import kafka.log._
+import kafka.utils.ZkUtils._
 
 import org.junit.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
@@ -187,18 +188,18 @@ object TestUtils extends Logging {
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
-  def createTopic(zkClient: ZkClient,
+  def createTopic(zkUtils: ZkUtils,
                   topic: String,
                   numPartitions: Int = 1,
                   replicationFactor: Int = 1,
                   servers: Seq[KafkaServer],
                   topicConfig: Properties = new Properties) : scala.collection.immutable.Map[Int, Option[Int]] = {
     // create topic
-    AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, topicConfig)
+    AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
     // wait until the update metadata request for new topic reaches all servers
     (0 until numPartitions).map { case i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
-      i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
+      i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, i)
     }.toMap
   }
 
@@ -207,14 +208,14 @@ object TestUtils extends Logging {
    * Wait until the leader is elected and the metadata is propagated to all brokers.
    * Return the leader for each partition.
    */
-  def createTopic(zkClient: ZkClient, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
+  def createTopic(zkUtils: ZkUtils, topic: String, partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
                   servers: Seq[KafkaServer]) : scala.collection.immutable.Map[Int, Option[Int]] = {
     // create topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaAssignment)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaAssignment)
     // wait until the update metadata request for new topic reaches all servers
     partitionReplicaAssignment.keySet.map { case i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
-      i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
+      i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, i)
     }.toMap
   }
 
@@ -484,8 +485,8 @@ object TestUtils extends Logging {
   }
 
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
-    val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
-    ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
+    val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    zkUtils.updatePersistentPath(path, offset.toString)
 
   }
 
@@ -500,15 +501,15 @@ object TestUtils extends Logging {
     }
   }
 
-  def createBrokersInZk(zkClient: ZkClient, zkConnection: ZkConnection, ids: Seq[Int]): Seq[Broker] = {
+  def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = {
     val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
-    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, zkConnection, b.id, "localhost", 6667, b.endPoints, jmxPort = -1))
+    brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1))
     brokers
   }
 
-  def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
+  def deleteBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = {
     val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
-    brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
+    brokers.foreach(b => zkUtils.deletePath(ZkUtils.BrokerIdsPath + "/" + b))
     brokers
   }
 
@@ -545,7 +546,7 @@ object TestUtils extends Logging {
     new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
   }
 
-  def makeLeaderForPartition(zkClient: ZkClient, topic: String,
+  def makeLeaderForPartition(zkUtils: ZkUtils, topic: String,
                              leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
                              controllerEpoch: Int) {
     leaderPerPartitionMap.foreach
@@ -554,7 +555,7 @@ object TestUtils extends Logging {
         val partition = leaderForPartition._1
         val leader = leaderForPartition._2
         try{
-          val currentLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
+          val currentLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
           var newLeaderAndIsr: LeaderAndIsr = null
           if(currentLeaderAndIsrOpt == None)
             newLeaderAndIsr = new LeaderAndIsr(leader, List(leader))
@@ -564,8 +565,8 @@ object TestUtils extends Logging {
             newLeaderAndIsr.leaderEpoch += 1
             newLeaderAndIsr.zkVersion += 1
           }
-          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
+          zkUtils.updatePersistentPath(getTopicPartitionLeaderAndIsrPath(topic, partition),
+            zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
           case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe)
         }
@@ -579,7 +580,7 @@ object TestUtils extends Logging {
    *  If newLeaderOpt is defined, it waits until the new leader becomes the expected new leader.
    * @return The new leader or assertion failure if timeout is reached.
    */
-  def waitUntilLeaderIsElectedOrChanged(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long = 5000L,
+  def waitUntilLeaderIsElectedOrChanged(zkUtils: ZkUtils, topic: String, partition: Int, timeoutMs: Long = 5000L,
                                         oldLeaderOpt: Option[Int] = None, newLeaderOpt: Option[Int] = None): Option[Int] = {
     require(!(oldLeaderOpt.isDefined && newLeaderOpt.isDefined), "Can't define both the old and the new leader")
     val startTime = System.currentTimeMillis()
@@ -591,7 +592,7 @@ object TestUtils extends Logging {
     var leader: Option[Int] = None
     while (!isLeaderElectedOrChanged && System.currentTimeMillis() < startTime + timeoutMs) {
       // check if leader is elected
-      leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+      leader = zkUtils.getLeaderForPartition(topic, partition)
       leader match {
         case Some(l) =>
           if (newLeaderOpt.isDefined && newLeaderOpt.get == l) {
@@ -724,24 +725,24 @@ object TestUtils extends Logging {
     file.close()
   }
 
-  def checkForPhantomInSyncReplicas(zkClient: ZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
-    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+  def checkForPhantomInSyncReplicas(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
+    val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionToBeReassigned)
     // in sync replicas should not have any replica that is not in the new assigned replicas
     val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
     assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas),
       phantomInSyncReplicas.size == 0)
   }
 
-  def ensureNoUnderReplicatedPartitions(zkClient: ZkClient, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
+  def ensureNoUnderReplicatedPartitions(zkUtils: ZkUtils, topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
                                                 servers: Seq[KafkaServer]) {
     TestUtils.waitUntilTrue(() => {
-        val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
+        val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionToBeReassigned)
         inSyncReplicas.size == assignedReplicas.size
       },
       "Reassigned partition [%s,%d] is under replicated".format(topic, partitionToBeReassigned))
     var leader: Option[Int] = None
     TestUtils.waitUntilTrue(() => {
-        leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned)
+        leader = zkUtils.getLeaderForPartition(topic, partitionToBeReassigned)
         leader.isDefined
       },
       "Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
@@ -752,8 +753,8 @@ object TestUtils extends Logging {
       "Reassigned partition [%s,%d] is under-replicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get))
   }
 
-  def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {
-    ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
+  def checkIfReassignPartitionPathExists(zkUtils: ZkUtils): Boolean = {
+    zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)
   }
 
   def verifyNonDaemonThreadsStatus(threadNamePrefix: String) {
@@ -875,12 +876,12 @@ object TestUtils extends Logging {
     messages.reverse
   }
 
-  def verifyTopicDeletion(zkClient: ZkClient, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
+  def verifyTopicDeletion(zkUtils: ZkUtils, topic: String, numPartitions: Int, servers: Seq[KafkaServer]) {
     val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)),
       "Admin path /admin/delete_topic/%s path not deleted even after a replica is restarted".format(topic))
-    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getTopicPath(topic)),
       "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic, topic))
     // ensure that the topic-partition has been deleted from all brokers' replica managers
     TestUtils.waitUntilTrue(() =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
index 2bf658c..eb08f58 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
@@ -34,20 +34,20 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
   @Test
   def testEphemeralNodeCleanup = {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
 
     try {
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
+      zkUtils.createEphemeralPathExpectConflict("/tmp/zktest", "node created")
     } catch {                       
       case e: Exception =>
     }
 
     var testData: String = null
-    testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1
+    testData = zkUtils.readData("/tmp/zktest")._1
     Assert.assertNotNull(testData)
-    zkClient.close
-    zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
-    val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
+    zkUtils.close
+    zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
+    val nodeExists = zkUtils.pathExists("/tmp/zktest")
     Assert.assertFalse(nodeExists)
   }
 
@@ -74,8 +74,8 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
   }
 
   private def testCreation(path: String) {
-    val zk = zkConnection.getZookeeper
-    val zwe = new ZKCheckedEphemeral(path, "", zk)
+    val zk = zkUtils.zkConnection.getZookeeper
+    val zwe = new ZKCheckedEphemeral(path, "", zk, false)
     var created = false
     var counter = 10
 
@@ -88,7 +88,7 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
     })
     zwe.create()
     // Waits until the znode is created
-    TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, path),
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(path),
                             "Znode %s wasn't created".format(path))
   }
 
@@ -99,12 +99,12 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
   @Test
   def testOverlappingSessions = {
     val path = "/zwe-test"
-    val zk1 = zkConnection.getZookeeper
+    val zk1 = zkUtils.zkConnection.getZookeeper
 
     //Creates a second session
     val (_, zkConnection2) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeoutMs, zkConnectionTimeout)
     val zk2 = zkConnection2.getZookeeper
-    var zwe = new ZKCheckedEphemeral(path, "", zk2)
+    var zwe = new ZKCheckedEphemeral(path, "", zk2, false)
 
     // Creates znode for path in the first session
     zk1.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
@@ -127,11 +127,11 @@ class ZKEphemeralTest extends ZooKeeperTestHarness {
   @Test
   def testSameSession = {
     val path = "/zwe-test"
-    val zk = zkConnection.getZookeeper
+    val zk = zkUtils.zkConnection.getZookeeper
     // Creates znode for path in the first session
     zk.create(path, Array[Byte](), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
     
-    var zwe = new ZKCheckedEphemeral(path, "", zk)
+    var zwe = new ZKCheckedEphemeral(path, "", zk, false)
     //Bootstraps the ZKWatchedEphemeral object
     var gotException = false;
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
index 241eea5..65dd589 100644
--- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala
@@ -33,11 +33,11 @@ class ZKPathTest extends ZooKeeperTestHarness {
   def testCreatePersistentPathThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
-      ZkUtils.createPersistentPath(zkClient, path)
+      zkUtils.createPersistentPath(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
       case configException: ConfigException =>
@@ -48,26 +48,26 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testCreatePersistentPath {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
-      ZkUtils.createPersistentPath(zkClient, path)
+      zkUtils.createPersistentPath(path)
     } catch {
       case exception: Throwable => fail("Failed to create persistent path")
     }
 
-    assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
+    assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
   }
 
   @Test
   def testMakeSurePersistsPathExistsThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
-      ZkUtils.makeSurePersistentPathExists(zkClient, path)
+      zkUtils.makeSurePersistentPathExists(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
       case configException: ConfigException =>
@@ -78,26 +78,26 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testMakeSurePersistsPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
-      ZkUtils.makeSurePersistentPathExists(zkClient, path)
+      zkUtils.makeSurePersistentPathExists(path)
     } catch {
       case exception: Throwable => fail("Failed to create persistent path")
     }
 
-    assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, path))
+    assertTrue("Failed to create persistent path", zkUtils.pathExists(path))
   }
 
   @Test
   def testCreateEphemeralPathThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
+      zkUtils.createEphemeralPathExpectConflict(path, "somedata")
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
       case configException: ConfigException =>
@@ -108,26 +108,26 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testCreateEphemeralPathExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata")
+      zkUtils.createEphemeralPathExpectConflict(path, "somedata")
     } catch {
       case exception: Throwable => fail("Failed to create ephemeral path")
     }
 
-    assertTrue("Failed to create ephemeral path", ZkUtils.pathExists(zkClient, path))
+    assertTrue("Failed to create ephemeral path", zkUtils.pathExists(path))
   }
 
   @Test
   def testCreatePersistentSequentialThrowsException {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot,
       "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
-      config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnectWithInvalidRoot, zkSessionTimeoutMs,
+      config.zkConnectionTimeoutMs, false)
     try {
       ZkPath.resetNamespaceCheckedState
-      ZkUtils.createSequentialPersistentPath(zkClient, path)
+      zkUtils.createSequentialPersistentPath(path)
       fail("Failed to throw ConfigException for missing zookeeper root node")
     } catch {
       case configException: ConfigException =>
@@ -138,16 +138,16 @@ class ZKPathTest extends ZooKeeperTestHarness {
   @Test
   def testCreatePersistentSequentialExists {
     val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
-    var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
+    var zkUtils = ZkUtils(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)
 
     var actualPath: String = ""
     try {
       ZkPath.resetNamespaceCheckedState
-      actualPath = ZkUtils.createSequentialPersistentPath(zkClient, path)
+      actualPath = zkUtils.createSequentialPersistentPath(path)
     } catch {
       case exception: Throwable => fail("Failed to create persistent path")
     }
 
-    assertTrue("Failed to create persistent path", ZkUtils.pathExists(zkClient, actualPath))
+    assertTrue("Failed to create persistent path", zkUtils.pathExists(actualPath))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce306ba4/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 3e1c6e0..38a0765 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -25,25 +25,21 @@ import org.scalatest.junit.JUnitSuite
 trait ZooKeeperTestHarness extends JUnitSuite {
   var zkPort: Int = -1
   var zookeeper: EmbeddedZookeeper = null
-  var zkClient: ZkClient = null
-  var zkConnection : ZkConnection = null
+  var zkUtils: ZkUtils = null
   val zkConnectionTimeout = 6000
   val zkSessionTimeout = 6000
-
   def zkConnect: String = "127.0.0.1:" + zkPort
 
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
     zkPort = zookeeper.port
-    val (client, connection) = ZkUtils.createZkClientAndConnection(zkConnect, zkSessionTimeout, zkConnectionTimeout)
-    zkClient = client
-    zkConnection = connection
+    zkUtils = ZkUtils.apply(zkConnect, zkSessionTimeout, zkConnectionTimeout, false)
   }
 
   @After
   def tearDown() {
-    CoreUtils.swallow(zkClient.close())
+    CoreUtils.swallow(zkUtils.close())
     CoreUtils.swallow(zookeeper.shutdown())
   }
 


Mime
View raw message