kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [25/30] KAFKA-622 Create mbeans per client; patched by Swapnil; reviewed by Neha Narkhede
Date Tue, 18 Dec 2012 17:44:12 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 962d5f9..bb39e09 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -55,7 +55,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness
{
                                                            queue,
                                                            new AtomicLong(consumedOffset),
                                                            new AtomicLong(0),
-                                                           new AtomicInteger(0)))
+                                                           new AtomicInteger(0),
+                                                           new ConsumerTopicStats("")))
   val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group,
consumer0))
 
   override def setUp() {
@@ -78,7 +79,8 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness
{
                                                     consumerConfig.consumerTimeoutMs,
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
-                                                    enableShallowIterator = false)
+                                                    enableShallowIterator = false,
+                                                    consumerTopicStats = new ConsumerTopicStats(""))
     var receivedMessages = (0 until 5).map(i => iter.next.message).toList
 
     assertFalse(iter.hasNext)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 8c7f774..d7945a5 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -24,7 +24,6 @@ import kafka.server._
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
-import kafka.message.Message
 import kafka.serializer._
 import kafka.producer.{Producer, KeyedMessage}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index df754cc..dec0453 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -23,7 +23,6 @@ import scala.collection._
 import junit.framework.Assert._
 
 import kafka.cluster._
-import kafka.message._
 import kafka.server._
 import org.scalatest.junit.JUnit3Suite
 import kafka.consumer._
@@ -50,7 +49,8 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
                                                            queue,
                                                            new AtomicLong(0),
                                                            new AtomicLong(0),
-                                                           new AtomicInteger(0)))
+                                                           new AtomicInteger(0),
+                                                           new ConsumerTopicStats("")))
 
   var fetcher: ConsumerFetcherManager = null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
index 4411f45..c4866eb 100644
--- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
@@ -18,7 +18,7 @@
 package kafka.integration
 
 import kafka.api.FetchRequestBuilder
-import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.message.ByteBufferMessageSet
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 72902ba..402fced 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -25,7 +25,6 @@ import java.util.Properties
 import kafka.utils.Utils
 import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
 import kafka.serializer._
-import kafka.message.Message
 import kafka.utils.TestUtils
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index fd9fae5..caea858 100644
--- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -21,7 +21,6 @@ import kafka.consumer.SimpleConsumer
 import org.scalatest.junit.JUnit3Suite
 import java.util.Properties
 import kafka.producer.{ProducerConfig, Producer}
-import kafka.message.Message
 import kafka.utils.TestUtils
 import kafka.serializer._
 
@@ -44,10 +43,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes
       props.put("producer.request.required.acks", "-1")
       props.put("serializer.class", classOf[StringEncoder].getName.toString)
       producer = new Producer(new ProducerConfig(props))
-      consumer = new SimpleConsumer(host,
-                                   port,
-                                   1000000,
-                                   64*1024)
+      consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
     }
 
    override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 9664876..9f243f0 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -29,7 +29,7 @@ import kafka.producer.KeyedMessage
 import kafka.javaapi.producer.Producer
 import kafka.utils.IntEncoder
 import kafka.utils.TestUtils._
-import kafka.utils.{Utils, Logging, TestUtils}
+import kafka.utils.{Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.zk.ZooKeeperTestHarness
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index f3a272e..b6bab2d 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -53,7 +53,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     logDir = new File(logDirPath)
     time = new MockTime()
     server = TestUtils.createServer(new KafkaConfig(config), time)
-    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
+    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index da3c704..c25255f 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -24,7 +24,6 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{TestUtils, Utils, Logging}
 import junit.framework.Assert._
 import kafka.api.FetchRequestBuilder
-import kafka.message.Message
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
 import kafka.zk.ZooKeeperTestHarness
@@ -57,7 +56,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     logDirZk = new File(logDirZkPath)
     config = new KafkaConfig(propsZk)
     serverZk = TestUtils.createServer(config);
-    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024)
+    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 19f4c3b..d67abe9 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.producer
 
-import java.util.{LinkedList, Properties}
+import java.util.Properties
 import java.util.concurrent.LinkedBlockingQueue
 import junit.framework.Assert._
 import org.easymock.EasyMock
@@ -68,7 +68,10 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val config = new ProducerConfig(props)
     val produceData = getProduceData(12)
-    val producer = new Producer[String, String](config, mockEventHandler)
+    val producer = new Producer[String, String](config,
+                                                mockEventHandler,
+                                                new ProducerStats(""),
+                                                new ProducerTopicStats(""))
     try {
       // send all 10 messages, should hit the batch size and then reach broker
       producer.send(produceData: _*)
@@ -118,7 +121,7 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
     val producerSendThread =
-      new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE,
5)
+      new ProducerSendThread[String,String]("thread1", queue, mockHandler, Integer.MAX_VALUE,
5, "")
     producerSendThread.start()
 
     for (producerData <- producerDataList)
@@ -143,7 +146,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val queueExpirationTime = 200
     val queue = new LinkedBlockingQueue[KeyedMessage[String,String]](10)
     val producerSendThread =
-      new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime,
5)
+      new ProducerSendThread[String,String]("thread1", queue, mockHandler, queueExpirationTime,
5, "")
     producerSendThread.start()
 
     for (producerData <- producerDataList)
@@ -185,11 +188,13 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val producerPool = new ProducerPool(config)
     val handler = new DefaultEventHandler[Int,String](config,
-                                                         partitioner = intPartitioner,
-                                                         encoder = null.asInstanceOf[Encoder[String]],
-                                                         keyEncoder = new IntEncoder(),
-                                                         producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                      partitioner = intPartitioner,
+                                                      encoder = null.asInstanceOf[Encoder[String]],
+                                                      keyEncoder = new IntEncoder(),
+                                                      producerPool = producerPool,
+                                                      topicPartitionInfos = topicPartitionInfos,
+                                                      producerStats = new ProducerStats(""),
+                                                      producerTopicStats = new ProducerTopicStats(""))
 
     val topic1Broker1Data = 
       ArrayBuffer[KeyedMessage[Int,Message]](new KeyedMessage[Int,Message]("topic1", 0, new
Message("msg1".getBytes)),
@@ -228,8 +233,9 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos
-    )
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
 
     val serializedData = handler.serialize(produceData)
     val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic,
Utils.readString(d.message.payload)))
@@ -257,7 +263,9 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with UnknownTopicOrPartitionException")
@@ -288,7 +296,9 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = new StringEncoder,
                                                          keyEncoder = new StringEncoder,
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -335,7 +345,9 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          encoder = null.asInstanceOf[Encoder[String]],
                                                          keyEncoder = null.asInstanceOf[Encoder[String]],
                                                          producerPool = producerPool,
-                                                         topicPartitionInfos)
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
     val producerDataList = new ArrayBuffer[KeyedMessage[String,Message]]
     producerDataList.append(new KeyedMessage[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new KeyedMessage[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -373,14 +385,19 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val msgs = TestUtils.getMsgStrings(10)
 
-    val handler = new DefaultEventHandler[String,String]( config,
-                                                          partitioner = null.asInstanceOf[Partitioner[String]],
-                                                          encoder = new StringEncoder,
-                                                          keyEncoder = new StringEncoder,
-                                                          producerPool = producerPool,
-                                                          topicPartitionInfos)
-
-    val producer = new Producer[String, String](config, handler)
+    val handler = new DefaultEventHandler[String,String](config,
+                                                         partitioner = null.asInstanceOf[Partitioner[String]],
+                                                         encoder = new StringEncoder,
+                                                         keyEncoder = new StringEncoder,
+                                                         producerPool = producerPool,
+                                                         topicPartitionInfos = topicPartitionInfos,
+                                                         producerStats = new ProducerStats(""),
+                                                         producerTopicStats = new ProducerTopicStats(""))
+
+    val producer = new Producer[String, String](config,
+                                                handler,
+                                                new ProducerStats(""),
+                                                new ProducerTopicStats(""))
     try {
       // send all 10 messages, should create 2 batches and 2 syncproducer calls
       producer.send(msgs.map(m => new KeyedMessage[String,String](topic, m)): _*)
@@ -435,7 +452,9 @@ class AsyncProducerTest extends JUnit3Suite {
                                                       encoder = new StringEncoder(),
                                                       keyEncoder = new NullEncoder[Int](),
                                                       producerPool = producerPool,
-                                                      topicPartitionInfos)
+                                                      topicPartitionInfos = topicPartitionInfos,
+                                                      producerStats = new ProducerStats(""),
+                                                      producerTopicStats = new ProducerTopicStats(""))
     val data = msgs.map(m => new KeyedMessage[Int,String](topic1, 0, m)) ++ msgs.map(m
=> new KeyedMessage[Int,String](topic1, 1, m))
     handler.handle(data)
     handler.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 0b86777..48842eb 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -65,8 +65,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("host", "localhost")
     props.put("port", port1.toString)
 
-    consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
-    consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024)
+    consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
+    consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
+
 
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index ad2158c..a3afa2d 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -23,8 +23,6 @@ import kafka.utils.TestUtils._
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
-import kafka.serializer._
-import kafka.message.Message
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index fa6a64e..7afbe54 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -20,7 +20,7 @@ import java.io.File
 import kafka.consumer.SimpleConsumer
 import org.junit.Test
 import junit.framework.Assert._
-import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.message.ByteBufferMessageSet
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.producer._
@@ -66,10 +66,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness
{
     server.startup()
 
     producer = new Producer[Int, String](new ProducerConfig(producerConfig))
-    val consumer = new SimpleConsumer(host,
-                                      port,
-                                      1000000,
-                                      64*1024)
+    val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
new file mode 100644
index 0000000..6b9315e
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/ClientIdTest.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import junit.framework.Assert._
+import collection.mutable.ArrayBuffer
+import kafka.common.InvalidClientIdException
+import org.junit.Test
+
+class ClientIdTest {
+
+  @Test
+  def testInvalidClientIds() {
+    val invalidclientIds = new ArrayBuffer[String]()
+    invalidclientIds += (".", "..")
+    var longName = "ATCG"
+    for (i <- 1 to 6)
+      longName += longName
+    invalidclientIds += longName
+    val badChars = Array('/', '\\', ',', '\0', ':', "\"", '\'', ';', '*', '?', '.', ' ',
'\t', '\r', '\n', '=')
+    for (weirdChar <- badChars) {
+      invalidclientIds += "Is" + weirdChar + "funny"
+    }
+
+    for (i <- 0 until invalidclientIds.size) {
+      try {
+        ClientId.validate(invalidclientIds(i))
+        fail("Should throw InvalidClientIdException.")
+      }
+      catch {
+        case e: InvalidClientIdException => "This is good."
+      }
+    }
+
+    val validClientIds = new ArrayBuffer[String]()
+    validClientIds += ("valid", "CLIENT", "iDs", "ar6", "VaL1d", "_0-9_", "")
+    for (i <- 0 until validClientIds.size) {
+      try {
+        ClientId.validate(validClientIds(i))
+      }
+      catch {
+        case e: Exception => fail("Should not throw exception.")
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index b87d50c..c79192c 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -59,7 +59,8 @@ public class SimpleConsumerDemo {
     SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
                                                        KafkaProperties.kafkaServerPort,
                                                        KafkaProperties.connectionTimeOut,
-                                                       KafkaProperties.kafkaProducerBufferSize);
+                                                       KafkaProperties.kafkaProducerBufferSize,
+                                                       KafkaProperties.clientId);
 
     System.out.println("Testing single fetch");
     FetchRequest req = new FetchRequestBuilder()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d7c71c09/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
index ed1e0e8..9c9eead 100644
--- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
@@ -42,7 +42,7 @@ object SimpleConsumerPerformance {
         println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
     }
 
-    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize)
+    val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize,
config.clientId)
 
     // reset to latest or smallest offset
     val topicAndPartition = TopicAndPartition(config.topic, config.partition)


Mime
View raw message