kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1235492 [2/2] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/...
Date Tue, 24 Jan 2012 20:54:13 GMT
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Tue Jan 24 20:54:11 2012
@@ -23,7 +23,6 @@ import java.nio.channels._
 import java.util.concurrent.atomic._
 import java.lang.management._
 import java.util.zip.CRC32
-import org.apache.log4j.Logger
 import javax.management._
 import java.util.Properties
 import scala.collection._
@@ -115,7 +114,7 @@ object Utils extends Logging {
    * @param buffer The buffer to read from
    * @param encoding The encoding in which to read the string
    */
-  def readShortString(buffer: ByteBuffer, encoding: String): String = {
+  def readShortString(buffer: ByteBuffer, encoding: String = "UTF-8"): String = {
     val size: Int = buffer.getShort()
     if(size < 0)
       return null
@@ -130,7 +129,7 @@ object Utils extends Logging {
    * @param string The string to write
    * @param encoding The encoding in which to write the string
    */
-  def writeShortString(buffer: ByteBuffer, string: String, encoding: String): Unit = {
+  def writeShortString(buffer: ByteBuffer, string: String, encoding: String = "UTF-8"): Unit = {
     if(string == null) {
       buffer.putShort(-1)
     } else if(string.length > Short.MaxValue) {
@@ -142,6 +141,24 @@ object Utils extends Logging {
   }
   
   /**
+   * Return size of a size prefixed string where the size is stored as a 2 byte short
+   * @param string The string to write
+   * @param encoding The encoding in which to write the string
+   */
+  def shortStringLength(string: String, encoding: String = "UTF-8"): Int = {
+    if(string == null) {
+      2
+    } else {
+      val encodedString = string.getBytes(encoding)
+      if(encodedString.length > Short.MaxValue) {
+        throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+      } else {
+        2 + encodedString.length
+      }
+    }
+  }
+
+  /**
    * Read a properties file from the given path
    * @param filename The path of the file to read
    */
@@ -193,7 +210,28 @@ object Utils extends Logging {
     else
       v
   }
-  
+
+  def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
+    val value = buffer.getInt
+    if(value < range._1 || value > range._2)
+      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
+  def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
+    val value = buffer.getShort
+    if(value < range._1 || value > range._2)
+      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
+  def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
+    val value = buffer.getLong
+    if(value < range._1 || value > range._2)
+      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
   /**
    * Read a boolean value from the properties instance
    * @param props The properties to read from
@@ -618,7 +656,7 @@ object Utils extends Logging {
   def tryCleanupZookeeper(zkUrl: String, groupId: String) {
     try {
       val dir = "/consumers/" + groupId
-      logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+      info("Cleaning up temporary zookeeper data under " + dir + ".")
       val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
       zk.deleteRecursive(dir)
       zk.close()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Tue Jan 24 20:54:11 2012
@@ -33,34 +33,49 @@ object ZkUtils extends Logging {
     BrokerTopicsPath + "/" + topic
   }
 
-  def getTopicPartsPath(topic: String): String ={
+  def getTopicPartitionsPath(topic: String): String ={
     getTopicPath(topic) + "/" + "partitions"
   }
 
-  def getTopicPartPath(topic: String, partitionId: String): String ={
-    getTopicPartsPath(topic) + "/" + partitionId
+  def getTopicPartitionPath(topic: String, partitionId: String): String ={
+    getTopicPartitionsPath(topic) + "/" + partitionId
   }
 
   def getTopicVersion(zkClient: ZkClient, topic: String): String ={
     readDataMaybeNull(zkClient, getTopicPath(topic))
   }
 
-  def getTopicPartReplicasPath(topic: String, partitionId: String): String ={
-    getTopicPartPath(topic, partitionId) + "/" + "replicas"
+  def getTopicPartitionReplicasPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "replicas"
   }
 
-  def getTopicPartInSyncPath(topic: String, partitionId: String): String ={
-    getTopicPartPath(topic, partitionId) + "/" + "isr"
+  def getTopicPartitionInSyncPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "isr"
   }
 
-  def getTopicPartLeaderPath(topic: String, partitionId: String): String ={
-    getTopicPartPath(topic, partitionId) + "/" + "leader"
+  def getTopicPartitionLeaderPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "leader"
   }
 
   def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
       ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
 
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
+    val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+    val broker = new Broker(id, creator, host, port)
+    try {
+      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+    } catch {
+      case e: ZkNodeExistsException =>
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
+                                   "indicates that you either have configured a brokerid that is already in use, or " +
+                                   "else you have shutdown this broker and restarted it faster than the zookeeper " +
+                                   "timeout so it appears to be re-registering.")
+    }
+    info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
@@ -283,6 +298,17 @@ object ZkUtils extends Logging {
     val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
     zkClient.delete(brokerPartTopicPath)
   }
+
+  /**
+   * For a given topic, this returns the sorted list of partition ids registered for this topic
+   */
+  def getSortedPartitionIdsForTopic(zkClient: ZkClient, topic: String): Seq[Int] = {
+    val topicPartitionsPath = ZkUtils.getTopicPartitionsPath(topic)
+    ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).map(pid => pid.toInt).sortWith((s,t) => s < t)
+  }
+
+  def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
+    brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
 }
 
 object ZKStringSerializer extends ZkSerializer {

Modified: incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/branches/0.8/core/src/test/resources/log4j.properties Tue Jan 24 20:54:11 2012
@@ -12,13 +12,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=WARN, stdout
+log4j.rootLogger=OFF, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=WARN
+log4j.logger.kafka=OFF
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
\ No newline at end of file
+log4j.logger.org.I0Itec.zkclient.ZkClient=OFF

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestKafkaAppender.scala Tue Jan 24 20:54:11 2012
@@ -18,7 +18,7 @@
 package kafka
 
 import message.Message
-import org.apache.log4j.{Logger, PropertyConfigurator}
+import org.apache.log4j.PropertyConfigurator
 import kafka.utils.Logging
 import serializer.Encoder
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala Tue Jan 24 20:54:11 2012
@@ -17,8 +17,6 @@
 
 package kafka
 
-import java.net.URI
-import java.util.Arrays.asList
 import java.io._
 import java.nio._
 import java.nio.channels._

Modified: incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import kafka.log._
 import kafka.message._
 import kafka.utils.{TestUtils, Utils}
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Tue Jan 24 20:54:11 2012
@@ -20,18 +20,17 @@ import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.TestZKUtils
+import kafka.utils.TestUtils
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect
- 
+
   @Test
   def testReplicaAssignment() {
     val brokerList = List("0", "1", "2", "3", "4")
 
     // test 0 replication factor
     try {
-      AdminUtils.assginReplicasToBrokers(brokerList, 10, 0)
+      AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
       fail("shouldn't allow replication factor 0")
     }
     catch {
@@ -41,7 +40,7 @@ class AdminTest extends JUnit3Suite with
 
     // test wrong replication factor
     try {
-      AdminUtils.assginReplicasToBrokers(brokerList, 10, 6)
+      AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
       fail("shouldn't allow replication factor larger than # of brokers")
     }
     catch {
@@ -64,7 +63,7 @@ class AdminTest extends JUnit3Suite with
         List("4", "1", "2")
         )
 
-      val actualAssignment = AdminUtils.assginReplicasToBrokers(brokerList, 10, 3, 0)
+      val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
       val e = (expectedAssignment.toList == actualAssignment.toList)
       assertTrue(expectedAssignment.toList == actualAssignment.toList)
     }
@@ -121,7 +120,7 @@ class AdminTest extends JUnit3Suite with
 
   @Test
   def testTopicCreationInZK() {
-    val expectedReplicationAssignment = Array(
+    val expectedReplicaAssignment = Array(
       List("0", "1", "2"),
       List("1", "2", "3"),
       List("2", "3", "4"),
@@ -135,18 +134,46 @@ class AdminTest extends JUnit3Suite with
       List("1", "2", "3"),
       List("1", "3", "4")      
       )
+    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3, 4))
+
     val topic = "test"
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
-    val actualReplicationAssignment = AdminUtils.getTopicMetaDataFromZK(topic, zookeeper.client).get.map(p => p.replicaList)
-    assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+    // create the topic
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+    val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+                                  .get.partitionsMetadata.map(p => p.replicas)
+    val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
+    expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
 
     try {
-      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
-      fail("shouldn't be able to create a topic already exist")
+      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+      fail("shouldn't be able to create a topic already exists")
     }
     catch {
       case e: AdministrationException => // this is good
       case e2 => throw e2
     }
   }
+
+  @Test
+  def testGetTopicMetadata() {
+    val expectedReplicaAssignment = Array(
+      List("0", "1", "2"),
+      List("1", "2", "3")
+    )
+    val topic = "auto-topic"
+    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3))
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+
+    val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+    newTopicMetadata match {
+      case Some(metadata) => assertEquals(topic, metadata.topic)
+        assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
+        assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
+        assertNull("leader should not be assigned for now", metadata.partitionsMetadata.head.leader.getOrElse(null))
+        val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
+        val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
+        assertEquals(expectedReplicaAssignment.toList, actualReplicaList)
+      case None => fail("Topic " + topic + " should've been automatically created")
+    }
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Tue Jan 24 20:54:11 2012
@@ -19,7 +19,6 @@
 package kafka.consumer
 
 import junit.framework.Assert._
-import kafka.zk.ZooKeeperTestHarness
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import scala.collection._
@@ -30,19 +29,17 @@ import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer.StringDecoder
 
-class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
   val zookeeperConnect = TestZKUtils.zookeeperConnect
-  val zkConnect = zookeeperConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
     yield new KafkaConfig(props) {
-      override val enableZookeeper = true
-      override val numPartitions = numParts
       override val zkConnect = zookeeperConnect
+      override val numPartitions = numParts
     }
   val group = "group1"
   val consumer0 = "consumer0"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Tue Jan 24 20:54:11 2012
@@ -26,11 +26,10 @@ import kafka.consumer.{ConsumerTimeoutEx
 import kafka.server._
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{TestUtils, TestZKUtils}
+import kafka.utils.TestUtils
 
 class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
-  val zkConnect = TestZKUtils.zookeeperConnect
   val topic = "test_topic"
   val group = "default_group"
   val testConsumer = "consumer"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Tue Jan 24 20:54:11 2012
@@ -17,17 +17,16 @@
 
 package kafka.integration
 
-import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.server.KafkaConfig
 import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.Logger
 import java.util.Properties
 import kafka.consumer.SimpleConsumer
-import kafka.utils.{Utils, TestUtils}
 import kafka.api.{OffsetRequest, FetchRequest}
 import junit.framework.Assert._
-import java.io.File
+import kafka.utils.TestUtils
 
-class BackwardsCompatibilityTest extends JUnit3Suite {
+class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
 
   val topic = "MagicByte0"
   val group = "default_group"
@@ -40,24 +39,19 @@ class BackwardsCompatibilityTest extends
   kafkaProps.put("brokerid", "12")
   kafkaProps.put("port", port.toString)
   kafkaProps.put("log.dir", kafkaLogDir.getPath)
-  val kafkaConfig =
-    new KafkaConfig(kafkaProps) {
-      override val enableZookeeper = false
-    }
-  var kafkaServer : KafkaServer = null
+  kafkaProps.put("zk.connect", zkConnect.toString)
+  val configs = List(new KafkaConfig(kafkaProps))
   var simpleConsumer: SimpleConsumer = null
 
   private val logger = Logger.getLogger(getClass())
 
   override def setUp() {
     super.setUp()
-    kafkaServer = TestUtils.createServer(kafkaConfig)
     simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024)
   }
 
   override def tearDown() {
     simpleConsumer.close
-    kafkaServer.shutdown
     super.tearDown
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Tue Jan 24 20:54:11 2012
@@ -34,9 +34,7 @@ class FetcherTest extends JUnit3Suite wi
   val numNodes = 2
   val configs = 
     for(props <- TestUtils.createBrokerConfigs(numNodes))
-      yield new KafkaConfig(props) {
-        override val enableZookeeper = false
-      }
+      yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, ByteBufferMessageSet]
   val topic = "topic"
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Tue Jan 24 20:54:11 2012
@@ -17,34 +17,29 @@
 
 package kafka.integration
 
-import java.util.Properties
-import junit.framework.Assert._
-import kafka.producer._
-import kafka.consumer._
-import kafka.message._
 import kafka.server._
 import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
 
 /**
  * A test harness that brings up some number of broker nodes
  */
-trait KafkaServerTestHarness extends JUnit3Suite {
+trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
 
   val configs: List[KafkaConfig]
   var servers: List[KafkaServer] = null
 
   override def setUp() {
+    super.setUp
     if(configs.size <= 0)
       throw new IllegalArgumentException("Must suply at least one server config.")
     servers = configs.map(TestUtils.createServer(_))
-    super.setUp
   }
 
   override def tearDown() {
-    super.tearDown
     servers.map(server => server.shutdown())
     servers.map(server => Utils.rm(server.config.logDir))
+    super.tearDown
   }
-
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Tue Jan 24 20:54:11 2012
@@ -18,25 +18,23 @@
 package kafka.integration
 
 import scala.collection._
-import junit.framework.Assert._
 import kafka.common.OffsetOutOfRangeException
 import kafka.api.{ProducerRequest, FetchRequest}
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{TestUtils, Utils}
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils}
 
 /**
  * End to end tests of the primitive apis against a local server
  */
-class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness   {
+class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness  {
 
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props) {
-                 override val enableZookeeper = false
-               }
+  val config = new KafkaConfig(props)
   val configs = List(config)
   var servers: List[KafkaServer] = null
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Tue Jan 24 20:54:11 2012
@@ -23,8 +23,7 @@ import java.nio.ByteBuffer
 import kafka.utils.Utils
 import kafka.api.FetchRequest
 import kafka.common.InvalidMessageSizeException
-import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig}
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.ProducerConsumerTestHarness
@@ -32,13 +31,11 @@ import kafka.integration.KafkaServerTest
 import org.apache.log4j.{Logger, Level}
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 
-class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect  
+class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
                  override val hostName = "localhost"
-                 override val enableZookeeper = true
                }
   val configs = List(config)
   val topic = "test"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Tue Jan 24 20:54:11 2012
@@ -27,9 +27,9 @@ import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
-import kafka.utils.TestUtils
 import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
 import java.io.File
+import kafka.utils.TestUtils
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -39,7 +39,6 @@ class PrimitiveApiTest extends JUnit3Sui
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
-                 override val enableZookeeper = false
                  override val flushInterval = 1
                }
   val configs = List(config)

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1235492&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Jan 24 20:54:11 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.integration
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.admin.CreateTopicCommand
+import java.nio.ByteBuffer
+import kafka.log.LogManager
+import kafka.utils.TestUtils
+import kafka.server.{KafkaApis, KafkaConfig}
+import junit.framework.Assert._
+import org.I0Itec.zkclient.ZkClient
+import TestUtils._
+import org.easymock.EasyMock
+import kafka.network.BoundedByteBufferReceive
+import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
+import kafka.cluster.Broker
+
+class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val props = createBrokerConfigs(1)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  var zkClient: ZkClient = null
+  var brokers: Seq[Broker] = null
+
+  override def setUp() {
+    super.setUp()
+    zkClient = zookeeper.client
+    // create brokers in zookeeper
+    brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
+  }
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testTopicMetadataRequest {
+    // create topic
+    val topic = "test"
+    CreateTopicCommand.createTopic(zkClient, topic, 1)
+
+    // create a topic metadata request
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+
+    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
+    topicMetadataRequest.writeTo(serializedMetadataRequest)
+    serializedMetadataRequest.rewind()
+    val deserializedMetadataRequest = TopicMetadataRequest.readFrom(serializedMetadataRequest)
+
+    assertEquals(topicMetadataRequest, deserializedMetadataRequest)
+  }
+
+  def testBasicTopicMetadata {
+    // create topic
+    val topic = "test"
+    CreateTopicCommand.createTopic(zkClient, topic, 1)
+
+    mockLogManagerAndTestTopic(topic)
+  }
+
+  def testAutoCreateTopic {
+    // auto create topic
+    val topic = "test"
+
+    mockLogManagerAndTestTopic(topic)
+  }
+
+  private def mockLogManagerAndTestTopic(topic: String) = {
+    // topic metadata request only requires 2 APIs from the log manager
+    val logManager = EasyMock.createMock(classOf[LogManager])
+    EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
+    EasyMock.expect(logManager.getZookeeperClient).andReturn(zkClient)
+    EasyMock.replay(logManager)
+
+    // create a topic metadata request
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+
+    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
+    topicMetadataRequest.writeTo(serializedMetadataRequest)
+    serializedMetadataRequest.rewind()
+
+    // create the kafka request handler
+    val kafkaRequestHandler = new KafkaApis(logManager)
+
+    // mock the receive API to return the request buffer as created above
+    val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
+    EasyMock.expect(receivedRequest.buffer).andReturn(serializedMetadataRequest)
+    EasyMock.replay(receivedRequest)
+
+    // call the API (to be tested) to get metadata
+    val metadataResponse = kafkaRequestHandler.handleTopicMetadataRequest(receivedRequest)
+
+    // verify the topic metadata returned
+    metadataResponse match {
+      case Some(metadata) =>
+        val responseBuffer = metadata.asInstanceOf[TopicMetadataSend].metadata
+        val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(responseBuffer)
+        assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+        assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+        val partitionMetadata = topicMetadata.head.partitionsMetadata
+        assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+        assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+        assertEquals(brokers, partitionMetadata.head.replicas)
+        assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+      case None =>
+        fail("Metadata response expected")
+    }
+
+    // verify the expected calls to log manager occurred in the right order
+    EasyMock.verify(logManager)
+    EasyMock.verify(receivedRequest)
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Tue Jan 24 20:54:11 2012
@@ -18,11 +18,10 @@
 package kafka.javaapi.consumer
 
 import junit.framework.Assert._
-import kafka.zk.ZooKeeperTestHarness
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import kafka.utils.{Utils, Logging}
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
 import kafka.javaapi.message.ByteBufferMessageSet
@@ -30,17 +29,15 @@ import kafka.consumer.{ConsumerConfig, K
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 
-class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
-  val zkConnect = zookeeperConnect
+  val zookeeperConnect = zkConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
     yield new KafkaConfig(props) {
-      override val enableZookeeper = true
       override val numPartitions = numParts
       override val zkConnect = zookeeperConnect
     }
@@ -57,7 +54,7 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1 = sendMessages(nMessages, "batch1")
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+      TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala Tue Jan 24 20:54:11 2012
@@ -25,8 +25,8 @@ import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.javaapi.ProducerRequest
-import kafka.utils.TestUtils
 import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
+import kafka.utils.TestUtils
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -35,9 +35,7 @@ class PrimitiveApiTest extends JUnit3Sui
   
   val port = 9999
   val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props) {
-                 override val enableZookeeper = false
-               }
+  val config = new KafkaConfig(props)
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala Tue Jan 24 20:54:11 2012
@@ -35,7 +35,6 @@ trait BaseMessageSetTestCases extends JU
 
   @Test
   def testWrittenEqualsRead {
-    import scala.collection.JavaConversions._
     val messageSet = createMessageSet(messages)
     TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Tue Jan 24 20:54:11 2012
@@ -18,7 +18,6 @@
 package kafka.javaapi.message
 
 import java.nio._
-import junit.framework.TestCase
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Tue Jan 24 20:54:11 2012
@@ -21,10 +21,8 @@ import java.util.Properties
 import org.apache.log4j.{Logger, Level}
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
-import kafka.utils.{TestZKUtils, TestUtils}
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
-import collection.mutable.HashMap
 import org.easymock.EasyMock
 import kafka.utils._
 import java.util.concurrent.ConcurrentHashMap
@@ -34,7 +32,7 @@ import org.scalatest.junit.JUnitSuite
 import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner}
 import kafka.producer.ProducerPool
 import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.producer.async.{AsyncProducer, AsyncProducerConfig}
+import kafka.producer.async.AsyncProducer
 import kafka.javaapi.Implicits._
 import kafka.serializer.{StringEncoder, Encoder}
 import kafka.javaapi.consumer.SimpleConsumer

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala Tue Jan 24 20:54:11 2012
@@ -17,42 +17,28 @@
 
 package kafka.javaapi.producer
 
-import junit.framework.{Assert, TestCase}
-import kafka.utils.SystemTime
-import kafka.utils.TestUtils
-import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
-import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import kafka.server.KafkaConfig
+import org.apache.log4j.Logger
 import java.util.Properties
 import kafka.producer.SyncProducerConfig
 import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.javaapi.ProducerRequest
 import kafka.message.{NoCompressionCodec, Message}
+import kafka.integration.KafkaServerTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{SystemTime, TestUtils}
 
-class SyncProducerTest extends JUnitSuite {
+class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
-  private var server: KafkaServer = null
   val simpleProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1).head))
+  val zookeeperConnect = zkConnect
 
-  @Before
-  def setUp() {
-    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
-    {
-      override val enableZookeeper = false
-    })
-  }
-
-  @After
-  def tearDown() {
-    server.shutdown
-  }
-
-  @Test
   def testReachableServer() {
     val props = new Properties()
     props.put("host", "localhost")
-    props.put("port", "9092")
+    props.put("port", servers.head.socketServer.port.toString)
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "500")
     props.put("reconnect.interval", "1000")

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Tue Jan 24 20:54:11 2012
@@ -20,36 +20,37 @@ package kafka.log
 import java.io._
 import junit.framework.Assert._
 import kafka.server.KafkaConfig
-import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
-import kafka.utils.{Utils, MockTime, TestUtils}
+import org.junit.Test
 import kafka.common.OffsetOutOfRangeException
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
+import org.scalatest.junit.JUnit3Suite
 
-class LogManagerTest extends JUnitSuite {
+class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   val time: MockTime = new MockTime()
   val maxLogAge = 1000
   var logDir: File = null
   var logManager: LogManager = null
   var config:KafkaConfig = null
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
 
-  @Before
-  def setUp() {
+  override def setUp() {
+    super.setUp()
     val props = TestUtils.createBrokerConfig(0, -1)
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024
-                   override val enableZookeeper = false
                  }
     logManager = new LogManager(config, null, time, -1, maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
   }
 
-  @After
-  def tearDown() {
+  override def tearDown() {
     if(logManager != null)
       logManager.close()
     Utils.rm(logDir)
+    super.tearDown()
   }
   
   @Test
@@ -107,7 +108,6 @@ class LogManagerTest extends JUnitSuite 
     Thread.sleep(100)
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
-      override val enableZookeeper = false
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
       override val logRetentionHours = retentionHours
     }
@@ -152,7 +152,6 @@ class LogManagerTest extends JUnitSuite 
     Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024 *1024 *1024 
-                   override val enableZookeeper = false
                    override val flushSchedulerThreadRate = 50
                    override val flushInterval = Int.MaxValue
                    override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
@@ -176,7 +175,6 @@ class LogManagerTest extends JUnitSuite 
     Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 256
-                   override val enableZookeeper = false
                    override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
                  }
     

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue Jan 24 20:54:11 2012
@@ -25,16 +25,17 @@ import java.util.{Random, Properties}
 import kafka.api.{FetchRequest, OffsetRequest}
 import collection.mutable.WrappedArray
 import kafka.consumer.SimpleConsumer
-import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import org.apache.log4j._
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
 
 object LogOffsetTest {
   val random = new Random()  
 }
 
-class LogOffsetTest extends JUnitSuite {
+class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
   var logDir: File = null
   var topicLogDir: File = null
   var server: KafkaServer = null
@@ -45,7 +46,8 @@ class LogOffsetTest extends JUnitSuite {
   private val logger = Logger.getLogger(classOf[LogOffsetTest])
   
   @Before
-  def setUp() {
+  override def setUp() {
+    super.setUp()
     val config: Properties = createBrokerConfig(1, brokerPort)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
@@ -55,10 +57,11 @@ class LogOffsetTest extends JUnitSuite {
   }
 
   @After
-  def tearDown() {
+  override def tearDown() {
     simpleConsumer.close
     server.shutdown
     Utils.rm(logDir)
+    super.tearDown()
   }
 
   @Test
@@ -206,6 +209,7 @@ class LogOffsetTest extends JUnitSuite {
     props.put("log.retention.hours", "10")
     props.put("log.cleanup.interval.mins", "5")
     props.put("log.file.size", logSize.toString)
+    props.put("zk.connect", zkConnect.toString)
     props
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Jan 24 20:54:11 2012
@@ -18,14 +18,13 @@
 package kafka.log
 
 import java.io._
-import java.nio._
 import java.util.ArrayList
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, TestUtils, Range}
 import kafka.common.OffsetOutOfRangeException
-import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 
 class LogTest extends JUnitSuite {
   

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala Tue Jan 24 20:54:11 2012
@@ -23,27 +23,27 @@ import java.util.Properties
 import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.TestUtils
-import kafka.utils.{Utils, Logging}
 import junit.framework.Assert._
 import kafka.api.FetchRequest
 import kafka.serializer.Encoder
-import kafka.message.{MessageSet, Message}
+import kafka.message.Message
 import kafka.producer.async.MissingConfigException
-import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils, Logging}
 
-class KafkaLog4jAppenderTest extends JUnitSuite with Logging {
+class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   var logDir: File = null
-  //  var topicLogDir: File = null
   var server: KafkaServer = null
   val brokerPort: Int = 9092
   var simpleConsumer: SimpleConsumer = null
   val tLogger = Logger.getLogger(getClass())
 
   @Before
-  def setUp() {
+  override def setUp() {
+    super.setUp()
     val config: Properties = createBrokerConfig(1, brokerPort)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
@@ -54,11 +54,12 @@ class KafkaLog4jAppenderTest extends JUn
   }
 
   @After
-  def tearDown() {
+  override def tearDown() {
     simpleConsumer.close
     server.shutdown
     Thread.sleep(100)
     Utils.rm(logDir)
+    super.tearDown()
   }
 
   @Test
@@ -171,6 +172,7 @@ class KafkaLog4jAppenderTest extends JUn
     props.put("log.retention.hours", "10")
     props.put("log.cleanup.interval.mins", "5")
     props.put("log.file.size", "1000")
+    props.put("zk.connect", zkConnect.toString)
     props
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,6 @@
 
 package kafka.message
 
-import java.util.Arrays
 import junit.framework.Assert._
 import kafka.utils.TestUtils._
 import org.scalatest.junit.JUnitSuite

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala Tue Jan 24 20:54:11 2012
@@ -18,12 +18,9 @@
 package kafka.message
 
 import java.nio._
-import java.util.Arrays
-import junit.framework.TestCase
 import junit.framework.Assert._
 import kafka.utils.TestUtils._
 import org.junit.Test
-import kafka.message._
 
 class FileMessageSetTest extends BaseMessageSetTestCases {
   

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala Tue Jan 24 20:54:11 2012
@@ -19,9 +19,6 @@ package kafka.message
 
 import java.util._
 import java.nio._
-import java.nio.channels._
-import java.io._
-import junit.framework.TestCase
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Tue Jan 24 20:54:11 2012
@@ -19,15 +19,11 @@ package kafka.network;
 
 import java.net._
 import java.io._
-import java.nio._
-import java.nio.channels._
 import org.junit._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import kafka.utils.TestUtils
-import kafka.network._
 import java.util.Random
-import org.apache.log4j._
 
 class SocketServerTest extends JUnitSuite {
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Jan 24 20:54:11 2012
@@ -17,7 +17,7 @@
 
 package kafka.producer
 
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
 import java.util.Properties
 import org.easymock.EasyMock
 import kafka.api.ProducerRequest

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Jan 24 20:54:11 2012
@@ -17,14 +17,13 @@
 
 package kafka.producer
 
-import async.{AsyncProducerConfig, AsyncProducer}
+import async.AsyncProducer
 import java.util.Properties
 import org.apache.log4j.{Logger, Level}
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
-import collection.mutable.HashMap
 import org.easymock.EasyMock
 import java.util.concurrent.ConcurrentHashMap
 import kafka.cluster.Partition

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Tue Jan 24 20:54:11 2012
@@ -17,38 +17,23 @@
 
 package kafka.producer
 
-import junit.framework.{Assert, TestCase}
-import kafka.utils.SystemTime
-import kafka.utils.TestUtils
-import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
-import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import kafka.server.KafkaConfig
 import kafka.common.MessageSizeTooLargeException
 import java.util.Properties
 import kafka.api.ProducerRequest
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.integration.KafkaServerTestHarness
+import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
+import org.scalatest.junit.JUnit3Suite
 
-class SyncProducerTest extends JUnitSuite {
+class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
-  private var server: KafkaServer = null
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1).head))
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
 
-  @Before
-  def setUp() {
-    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort))
-    {
-      override val enableZookeeper = false
-    })
-  }
-
-  @After
-  def tearDown() {
-    if(server != null)
-      server.shutdown
-  }
-
-  @Test
   def testReachableServer() {
+    val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", server.socketServer.port.toString)
@@ -85,8 +70,8 @@ class SyncProducerTest extends JUnitSuit
     Assert.assertFalse(failed)
   }
 
-  @Test
   def testMessageSizeTooLarge() {
+    val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", server.socketServer.port.toString)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Tue Jan 24 20:54:11 2012
@@ -16,29 +16,25 @@
  */
 package kafka.server
 
-import kafka.utils.TestUtils
 import java.io.File
-import kafka.utils.Utils
 import kafka.api.FetchRequest
-import kafka.integration.ProducerConsumerTestHarness
 import kafka.producer.{SyncProducer, SyncProducerConfig}
 import kafka.consumer.SimpleConsumer
 import java.util.Properties
-import org.scalatest.junit.JUnitSuite
-import junit.framework.{Assert, TestCase}
-import org.junit.{After, Before, Test}
+import org.junit.Test
 import junit.framework.Assert._
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils}
 
-class ServerShutdownTest extends JUnitSuite {
+class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
 
   @Test
   def testCleanShutdown() {
     val props = TestUtils.createBrokerConfig(0, port)
-    val config = new KafkaConfig(props) {
-      override val enableZookeeper = false
-    }
+    val config = new KafkaConfig(props)
 
     val host = "localhost"
     val topic = "test"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Jan 24 20:54:11 2012
@@ -29,6 +29,7 @@ import kafka.producer._
 import kafka.message._
 import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.ConsumerConfig
+import kafka.cluster.Broker
 
 /**
  * Utility functions to help with testing
@@ -301,6 +302,12 @@ object TestUtils {
     }
   }
 
+  def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
+    val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.creatorId, b.port))
+    brokers
+  }
+
 }
 
 object TestZKUtils {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Tue Jan 24 20:54:11 2012
@@ -20,12 +20,11 @@ package kafka.zk
 import kafka.consumer.ConsumerConfig
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ZkUtils, ZKStringSerializer}
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.junit.Assert
 import org.scalatest.junit.JUnit3Suite
 
 class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect
   var zkSessionTimeoutMs = 1000
 
   def testEphemeralNodeCleanup = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala Tue Jan 24 20:54:11 2012
@@ -22,10 +22,9 @@ import java.util.Collections
 import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
 import java.lang.Thread
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs, TestZKUtils}
+import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs}
 
 class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect
   var dirs : ZKGroupTopicDirs = null
   val topic = "topic1"
   val group = "group1"

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Tue Jan 24 20:54:11 2012
@@ -18,9 +18,10 @@
 package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
+import kafka.utils.TestZKUtils
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
-  val zkConnect: String
+  val zkConnect: String = TestZKUtils.zookeeperConnect
   var zookeeper: EmbeddedZookeeper = null
 
   override def setUp() {

Modified: incubator/kafka/branches/0.8/project/build.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/project/build.properties?rev=1235492&r1=1235491&r2=1235492&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/project/build.properties (original)
+++ incubator/kafka/branches/0.8/project/build.properties Tue Jan 24 20:54:11 2012
@@ -16,7 +16,7 @@
 #Mon Feb 28 11:55:49 PST 2011
 project.name=Kafka
 sbt.version=0.7.5
-project.version=0.7.0
+project.version=0.8.0
 build.scala.versions=2.8.0
 contrib.root.dir=contrib
 lib.dir=lib



Mime
View raw message