kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1233 Follow up changes to new producer integration test; reviewed by Jay Kreps
Date Wed, 12 Feb 2014 21:00:08 GMT
Updated Branches:
  refs/heads/trunk b494cd9de -> 84a5803a7


KAFKA-1233 Follow up changes to new producer integration test; reviewed by Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84a5803a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84a5803a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84a5803a

Branch: refs/heads/trunk
Commit: 84a5803a7e5235e3fa0af7de8e40bdf77ff9affe
Parents: b494cd9
Author: Guozhang Wang <guwang@linkedin.com>
Authored: Wed Feb 12 12:50:26 2014 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Feb 12 12:50:33 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/PartitionerTest.java | 12 +++-
 .../kafka/api/ProducerSendTest.scala            | 75 ++++++--------------
 2 files changed, 33 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/84a5803a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index c78da64..f06e28c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 
 import java.util.List;
@@ -61,11 +62,20 @@ public class PartitionerTest {
     }
 
     @Test
+    public void testRoundRobinIsStable() {
+        int startPart = partitioner.partition(new ProducerRecord("test", value), cluster);
+        for (int i = 1; i <= 100; i++) {
+            int partition = partitioner.partition(new ProducerRecord("test", value), cluster);
+            assertEquals("Should yield a different partition each call with round-robin partitioner",
+                partition, (startPart + i) % 2);
+      }
+    }
+
+    @Test
     public void testRoundRobinWithDownNode() {
         for (int i = 0; i < partitions.size(); i++) {
             int part = partitioner.partition(new ProducerRecord("test", value), cluster);
             assertTrue("We should never choose a leader-less node in round robin", part >=
0 && part < 2);
-
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/84a5803a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index f8ba361..34baa8c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -195,8 +195,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   /**
    * testSendToPartition checks the partitioning behavior
    *
-   * 1. The default partitioner should have the correct round-robin behavior in assigning
partitions
-   * 2. The specified partition-id should be respected
+   * 1. The specified partition-id should be respected
    */
   @Test
   def testSendToPartition() {
@@ -210,68 +209,38 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness
{
       val leaders = TestUtils.createTopic(zkClient, topic, 2, 2, servers)
 
       // make sure leaders exist
-      val leader0 = leaders.get(0)
       val leader1 = leaders.get(1)
-      assertTrue("Leader for topic new-topic partition 0 should exist", leader0.isDefined)
-      assertTrue("Leader for topic new-topic partition 1 should exist", leader1.isDefined)
-
-      // case 1: use default partitioner, send 2*numRecords+2 messages with no partition-id/keys,
-      //         they should be assigned to two partitions evenly as (1,3,5,7..) and (2,4,6,8..)
-      for (i <- 1 to 2 * numRecords) {
-        val record = new ProducerRecord(topic, null, null, ("value" + i).getBytes)
-        producer.send(record)
+      assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined)
+
+      val partition = 1
+      val responses =
+        for (i <- 0 until numRecords)
+        yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
+      val futures = responses.toList
+      futures.map(_.wait)
+      for (future <- futures)
+        assertTrue("Request should have completed", future.isDone)
+
+      // make sure all of them end up in the same partition with increasing offset values
+      for ((future, offset) <- futures zip (0 until numRecords)) {
+        assertEquals(offset, future.get.offset)
+        assertEquals(topic, future.get.topic)
+        assertEquals(1, future.get.partition)
       }
 
-      // make sure both partitions have acked back
-      val record0 = new ProducerRecord(topic, null, null, ("value" + (2 * numRecords + 1)).getBytes)
-      val response0 = producer.send(record0);
-      assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset)
-      val record1 = new ProducerRecord(topic, null, null, ("value" + (2 * numRecords + 2)).getBytes)
-      val response1 = producer.send(record1);
-      assertEquals("Should have offset " + numRecords, numRecords.toLong, response1.get.offset)
-
-      // get messages from partition 0, and check they has numRecords+1 messages
-      val fetchResponse0 = if(leader0.get == server1.config.brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-      } else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-      }
-      val messageSet0 = fetchResponse0.messageSet(topic, 0).iterator.toBuffer
-      assertEquals("Should have fetched " + (numRecords + 1) + " messages", numRecords +
1, messageSet0.size)
-
-      // if the first message gets 1, make sure the rest are (3,5,7..);
-      // if the first message gets 2, make sure the rest are (4,6,8..)
-      val startWithOne = messageSet0(0).message.equals(new Message(bytes = "value1".getBytes))
-      for (i <- 1 to numRecords) {
-        if(startWithOne) {
-          assertEquals(new Message(bytes = ("value" + (i * 2 + 1)).getBytes), messageSet0(i).message)
-        } else {
-          assertEquals(new Message(bytes = ("value" + (i * 2 + 2)).getBytes), messageSet0(i).message)
-        }
-      }
-
-      // case 2: check the specified partition id is respected by sending numRecords with
partition-id 1
-      //         and make sure all of them end up in partition 1
-      for (i <- 1 to numRecords - 1) {
-        val record = new ProducerRecord(topic, new Integer(1), null, ("value" + i).getBytes)
-        producer.send(record)
-      }
-      val record2 = new ProducerRecord(topic, new Integer(1), null, ("value" + numRecords).getBytes)
-      val response2 = producer.send(record2);
-      assertEquals("Should have offset " + 2 * numRecords, (2 * numRecords).toLong, response2.get.offset)
-
-      // start fetching from offset numRecords+1
+      // make sure the fetched messages also respect the partitioning and ordering
       val fetchResponse1 = if(leader1.get == server1.config.brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, numRecords+1, Int.MaxValue).build())
+        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
       }else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, numRecords+1, Int.MaxValue).build())
+        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 1, 0, Int.MaxValue).build())
       }
       val messageSet1 = fetchResponse1.messageSet(topic, 1).iterator.toBuffer
-
       assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
 
+      // TODO: also check topic and partition after they are added in the return messageSet
       for (i <- 0 to numRecords - 1) {
         assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message)
+        assertEquals(i, messageSet1(i).offset)
       }
     } finally {
       if (producer != null) {


Mime
View raw message