apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-malhar git commit: MLHR-1934 #comment reset offset to ealiest/latest if current offset is out of range
Date Thu, 10 Dec 2015 21:13:27 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 b98bae305 -> 365237d5c


MLHR-1934 #comment reset offset to ealiest/latest if current offset is out of range


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/365237d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/365237d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/365237d5

Branch: refs/heads/devel-3
Commit: 365237d5c5c8a4af36a26df997b8ab710d794b5f
Parents: b98bae3
Author: Siyuan Hua <hsy541@apache.org>
Authored: Thu Dec 10 00:56:36 2015 -0800
Committer: Thomas Weise <thomas@datatorrent.com>
Committed: Thu Dec 10 13:11:09 2015 -0800

----------------------------------------------------------------------
 .../contrib/kafka/SimpleKafkaConsumer.java      | 14 ++++++-
 .../contrib/kafka/OffsetManagerTest.java        | 41 ++++++++++++++++----
 2 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/365237d5/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 84d7a10..58ef95f 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -159,10 +159,20 @@ public class SimpleKafkaConsumer extends KafkaConsumer
             FetchResponse fetchResponse = ksc.fetch(req);
             for (Iterator<KafkaPartition> iterator = kpS.iterator(); iterator.hasNext();)
{
               KafkaPartition kafkaPartition = iterator.next();
-              if (fetchResponse.hasError() && fetchResponse.errorCode(consumer.topic,
kafkaPartition.getPartitionId()) != ErrorMapping.NoError()) {
+              short errorCode = fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId());
+              if (fetchResponse.hasError() && errorCode != ErrorMapping.NoError())
{
                 // Kick off partition(s) which has error when fetch from this broker temporarily

                 // Monitor will find out which broker it goes in monitor thread
-                logger.warn("Error when consuming topic {} from broker {} with error code
{} ", kafkaPartition, broker,  fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()));
+                logger.warn("Error when consuming topic {} from broker {} with error {} ",
kafkaPartition, broker,
+                  ErrorMapping.exceptionFor(errorCode));
+                if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
+                  long seekTo = consumer.initialOffset.toLowerCase().equals("earliest") ?
OffsetRequest.EarliestTime()
+                    : OffsetRequest.LatestTime();
+                  seekTo = KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kafkaPartition.getPartitionId(),
seekTo, clientName);
+                  logger.warn("Offset out of range error, reset offset to {}", seekTo);
+                  consumer.offsetTrack.put(kafkaPartition, seekTo);
+                  continue;
+                }
                 iterator.remove();
                 consumer.partitionToBroker.remove(kafkaPartition);
                 consumer.stats.updatePartitionStats(kafkaPartition, -1, "");

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/365237d5/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
index 1374a1e..7b36ea8 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
@@ -63,6 +63,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
   static final int totalCount = 100;
   static CountDownLatch latch;
   static final String OFFSET_FILE = ".offset";
+  static long initialPos = 10l;
 
 
   public static class TestOffsetManager implements OffsetManager{
@@ -85,8 +86,8 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
     {
       KafkaPartition kp0 = new KafkaPartition(TEST_TOPIC, 0);
       KafkaPartition kp1 = new KafkaPartition(TEST_TOPIC, 1);
-      offsets.put(kp0, 10l);
-      offsets.put(kp1, 10l);
+      offsets.put(kp0, initialPos);
+      offsets.put(kp1, initialPos);
       return offsets;
     }
 
@@ -120,7 +121,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
         for (long entry : offsets.values()) {
           count += entry;
         }
-        if (count == totalCount + 2) {
+        if (count == totalCount) {
           // wait until all offsets add up to totalCount messages + 2 control END_TUPLE
           latch.countDown();
         }
@@ -188,10 +189,34 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
   @Test
   public void testSimpleConsumerUpdateOffsets() throws Exception
   {
+    initialPos = 10l;
     // Create template simple consumer
     try{
       SimpleKafkaConsumer consumer = new SimpleKafkaConsumer();
-      testPartitionableInputOperator(consumer);
+      testPartitionableInputOperator(consumer, totalCount - (int)initialPos - (int)initialPos);
+    } finally {
+      // clean test offset file
+      cleanFile();
+    }
+  }
+
+  /**
+   * Test OffsetManager update offsets in Simple Consumer
+   *
+   * [Generate send 100 messages to Kafka] ==> [wait until the offsets has been updated
to 102 or timeout after 30s which means offset has not been updated]
+   *
+   * Initial offsets are invalid, reset to ealiest and get all messages
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSimpleConsumerInvalidInitialOffsets() throws Exception
+  {
+    initialPos = 1000l;
+    // Create template simple consumer
+    try{
+      SimpleKafkaConsumer consumer = new SimpleKafkaConsumer();
+      testPartitionableInputOperator(consumer, totalCount);
     } finally {
       // clean test offset file
       cleanFile();
@@ -207,7 +232,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
     }
   }
 
-  public void testPartitionableInputOperator(KafkaConsumer consumer) throws Exception{
+  public void testPartitionableInputOperator(KafkaConsumer consumer, int expectedCount) throws
Exception{
 
     // Set to 3 because we want to make sure END_TUPLE from both 2 partitions are received
and offsets has been updated to 102
     latch = new CountDownLatch(3);
@@ -241,7 +266,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
     consumer.setTopic(TEST_TOPIC);
     //set the zookeeper list used to initialize the partition
     SetMultimap<String, String> zookeeper = HashMultimap.create();
-    String zks = KafkaPartition.DEFAULT_CLUSTERID + "::localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0];
+    String zks = "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0];
     consumer.setZookeeper(zks);
     consumer.setInitialOffset("earliest");
 
@@ -260,11 +285,11 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
     lc.runAsync();
 
     // Wait 30s for consumer finish consuming all the messages and offsets has been updated
to 100
-    assertTrue("TIMEOUT: 30s ", latch.await(30000, TimeUnit.MILLISECONDS));
+    assertTrue("TIMEOUT: 30s, collected " + collectedTuples + " tuples", latch.await(30000,
TimeUnit.MILLISECONDS));
 
 
     // Check results
-    assertEquals("Tuple count", totalCount -10 -10, collectedTuples.size());
+    assertEquals("Tuple count", expectedCount, collectedTuples.size());
     logger.debug(String.format("Number of emitted tuples: %d", collectedTuples.size()));
 
     p.close();


Mime
View raw message