kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1205535 - /incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
Date Wed, 23 Nov 2011 18:43:48 GMT
Author: junrao
Date: Wed Nov 23 18:43:48 2011
New Revision: 1205535

URL: http://svn.apache.org/viewvc?rev=1205535&view=rev
Log:
javaapi ZookeeperConsumerConnectorTest duplicates many tests in the scala version; patched
by Jun Rao; reviewed by Kay Kreps; KAFKA-210

Modified:
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1205535&r1=1205534&r2=1205535&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
Wed Nov 23 18:43:48 2011
@@ -21,17 +21,14 @@ import junit.framework.Assert._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import scala.collection._
 import kafka.utils.{Utils, Logging}
 import kafka.utils.{TestZKUtils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
 import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTimeoutException}
-import javax.management.NotCompliantMBeanException
+import kafka.consumer.{ConsumerConfig, KafkaMessageStream}
 import org.apache.log4j.{Level, Logger}
-import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message}
-import kafka.serializer.StringDecoder
+import kafka.message.{NoCompressionCodec, CompressionCodec, Message}
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with
ZooKeeperTestHarness with Logging {
 
@@ -48,10 +45,7 @@ class ZookeeperConsumerConnectorTest ext
       override val zkConnect = zookeeperConnect
     }
   val group = "group1"
-  val consumer0 = "consumer0"
   val consumer1 = "consumer1"
-  val consumer2 = "consumer2"
-  val consumer3 = "consumer3"
   val nMessages = 2
 
   def testBasic() {
@@ -59,23 +53,6 @@ class ZookeeperConsumerConnectorTest ext
     requestHandlerLogger.setLevel(Level.FATAL)
     var actualMessages: List[Message] = Nil
 
-    // test consumer timeout logic
-    val consumerConfig0 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
-      override val consumerTimeoutMs = 200
-    }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic
-> numNodes*numParts/2)))
-    try {
-      getMessages(nMessages*2, topicMessageStreams0)
-      fail("should get an exception")
-    }
-    catch {
-      case e: ConsumerTimeoutException => // this is ok
-      case e => throw e
-    }
-    zkConsumerConnector0.shutdown
-
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1")
     // create a consumer
@@ -85,162 +62,12 @@ class ZookeeperConsumerConnectorTest ext
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic
-> numNodes*numParts/2)))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
-    // commit consumed offsets
-    zkConsumerConnector1.commitOffsets
-
-    // create a consumer
-    val consumerConfig2 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic
-> numNodes*numParts/2)))
-    // send some messages to each broker
-    val sentMessages2 = sendMessages(nMessages, "batch2")
-    Thread.sleep(200)
-    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t)
=> s.checksum < t.checksum)
-    assertEquals(sentMessages2, receivedMessages2)
-
-    // create a consumer with empty map
-    val consumerConfig3 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
-    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String,
Int]()))
-    // send some messages to each broker
-    Thread.sleep(200)
-    val sentMessages3 = sendMessages(nMessages, "batch3")
-    Thread.sleep(200)
-    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t)
=> s.checksum < t.checksum)
-    assertEquals(sentMessages3, receivedMessages3)
 
     zkConsumerConnector1.shutdown
-    zkConsumerConnector2.shutdown
-    zkConsumerConnector3.shutdown
     info("all consumer connectors stopped")
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def testCompression() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
-    requestHandlerLogger.setLevel(Level.FATAL)
-
-    // send some messages to each broker
-    val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
-    // create a consumer
-    val consumerConfig1 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic
-> numNodes*numParts/2)))
-    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
-    assertEquals(sentMessages1, receivedMessages1)
-    // commit consumed offsets
-    zkConsumerConnector1.commitOffsets
-
-    // create a consumer
-    val consumerConfig2 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
-    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic
-> numNodes*numParts/2)))
-    // send some messages to each broker
-    val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
-    Thread.sleep(200)
-    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t)
=> s.checksum < t.checksum)
-    assertEquals(sentMessages2, receivedMessages2)
-
-    // create a consumer with empty map
-    val consumerConfig3 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
-    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String,
Int]()))
-    // send some messages to each broker
-    Thread.sleep(200)
-    val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
-    Thread.sleep(200)
-    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
-    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
-    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t)
=> s.checksum < t.checksum)
-    assertEquals(sentMessages3, receivedMessages3)
-
-    zkConsumerConnector1.shutdown
-    zkConsumerConnector2.shutdown
-    zkConsumerConnector3.shutdown
-    info("all consumer connectors stopped")
-    requestHandlerLogger.setLevel(Level.ERROR)
-  }
-
-  def testCompressionSetConsumption() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
-    requestHandlerLogger.setLevel(Level.FATAL)
-
-    var actualMessages: List[Message] = Nil
-
-    // shutdown one server
-    servers.last.shutdown
-    Thread.sleep(500)
-
-    // send some messages to each broker
-    val sentMessages = sendMessages(configs.head, 200, "batch1", DefaultCompressionCodec)
-    // test consumer timeout logic
-    val consumerConfig0 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
-      override val consumerTimeoutMs = 5000
-    }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic
-> 1)))
-    getMessages(100, topicMessageStreams0)
-    zkConsumerConnector0.shutdown
-    // at this point, only some part of the message set was consumed. So consumed offset
should still be 0
-    // also fetched offset should be 0
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic
-> 1)))
-    val receivedMessages = getMessages(400, topicMessageStreams1)
-    val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
-    val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
-    assertEquals(sortedSentMessages, sortedReceivedMessages)
-    zkConsumerConnector1.shutdown
-
-    requestHandlerLogger.setLevel(Level.ERROR)
-  }
-
-  def testConsumerDecoder() {
-    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
-    requestHandlerLogger.setLevel(Level.FATAL)
-
-    val sentMessages = sendMessages(nMessages, "batch1", NoCompressionCodec).
-      map(m => Utils.toString(m.payload, "UTF-8")).
-      sortWith((s, t) => s.compare(t) == -1)
-    val consumerConfig = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-
-    val zkConsumerConnector =
-      new ZookeeperConsumerConnector(consumerConfig, true)
-    val topicMessageStreams = zkConsumerConnector.createMessageStreams(
-      Predef.Map(topic -> new java.lang.Integer(numNodes * numParts / 2)), new StringDecoder)
-
-    var receivedMessages: List[String] = Nil
-    for ((topic, messageStreams) <- topicMessageStreams) {
-      for (messageStream <- messageStreams) {
-        val iterator = messageStream.iterator
-        for (i <- 0 until nMessages * 2) {
-          assertTrue(iterator.hasNext())
-          val message = iterator.next()
-          receivedMessages ::= message
-          debug("received message: " + message)
-        }
-      }
-    }
-    receivedMessages = receivedMessages.sortWith((s, t) => s.compare(t) == -1)
-    assertEquals(sentMessages, receivedMessages)
-
-    zkConsumerConnector.shutdown()
-    requestHandlerLogger.setLevel(Level.ERROR)
-  }
-
-
   def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec):
List[Message]= {
     var messages: List[Message] = Nil
     val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost",
conf.port))
@@ -282,15 +109,6 @@ class ZookeeperConsumerConnectorTest ext
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
 
-  def testJMX() {
-    val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect,
group, consumer0))
-    try {
-      val consumer = Consumer.createJavaConsumerConnector(consumerConfig)
-    }catch {
-      case e: NotCompliantMBeanException => fail("Should not fail with NotCompliantMBeanException")
-    }
-  }
-
   private def getMessageList(messages: Message*): java.util.List[Message] = {
     val messageList = new java.util.ArrayList[Message]()
     messages.foreach(m => messageList.add(m))
@@ -301,5 +119,5 @@ class ZookeeperConsumerConnectorTest ext
     val javaMap = new java.util.HashMap[String, java.lang.Integer]()
     scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer]))
     javaMap
-  }
+  }  
 }



Mime
View raw message