apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsy...@apache.org
Subject apex-malhar git commit: APEXMALHAR-2158 Fixed the duplication of messages emitted issue when the Kafka Input operator redeployed
Date Mon, 25 Jul 2016 17:37:26 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master aaa4464f0 -> 3cb30acda


APEXMALHAR-2158 Fixed the duplication of messages emitted issue when the Kafka Input operator
redeployed


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3cb30acd
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3cb30acd
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3cb30acd

Branch: refs/heads/master
Commit: 3cb30acda78c1993acb4458414d71013e8a097c9
Parents: aaa4464
Author: Chaitanya <chaitanya@datatorrent.com>
Authored: Fri Jul 22 15:43:41 2016 +0530
Committer: Chaitanya <chaitanya@datatorrent.com>
Committed: Fri Jul 22 15:43:41 2016 +0530

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       |  7 +-
 .../contrib/kafka/KafkaInputOperatorTest.java   | 69 ++++++++++++++++++--
 2 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3cb30acd/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 9a5917b..d4945ec 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -250,7 +250,12 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implem
     consumer.create();
     // reset the offsets to checkpointed one
     if (consumer instanceof SimpleKafkaConsumer && !offsetStats.isEmpty()) {
-      ((SimpleKafkaConsumer)consumer).resetOffset(offsetStats);
+      Map<KafkaPartition, Long> currentOffsets = new HashMap<>();
+      // Increment the offsets and set it to consumer
+      for (Map.Entry<KafkaPartition, Long> e: offsetStats.entrySet()) {
+        currentOffsets.put(e.getKey(), e.getValue() + 1);
+      }
+      ((SimpleKafkaConsumer)consumer).resetOffset(currentOffsets);
     }
     this.context = context;
     operatorId = context.getId();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3cb30acd/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index 27235f5..e4a4dec 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -289,7 +290,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     new Thread(p).start();
 
 
-    KafkaSinglePortStringInputOperator operator = createAndDeployOperator();
+    KafkaSinglePortStringInputOperator operator = createAndDeployOperator(true);
     latch.await(4000, TimeUnit.MILLISECONDS);
     operator.beginWindow(1);
     operator.emitTuples();
@@ -303,7 +304,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     operator.teardown();
     operator.deactivate();
 
-    operator = createAndDeployOperator();
+    operator = createAndDeployOperator(true);
     Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow());
 
     operator.beginWindow(1);
@@ -324,9 +325,57 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     operator.deactivate();
   }
 
-  private KafkaSinglePortStringInputOperator createAndDeployOperator()
+  @Test
+  public void testRecoveryAndExactlyOnce() throws Exception
   {
+    int totalCount = 1500;
+
+    // initial the latch for this test
+    latch = new CountDownLatch(50);
 
+    // Start producer
+    KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC);
+    p.setSendCount(totalCount);
+    new Thread(p).start();
+
+    KafkaSinglePortStringInputOperator operator = createAndDeployOperator(false);
+    latch.await(4000, TimeUnit.MILLISECONDS);
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+    operator.beginWindow(2);
+    operator.emitTuples();
+    operator.endWindow();
+    operator.checkpointed(2);
+    operator.committed(2);
+    Map<KafkaPartition, Long> offsetStats = operator.offsetStats;
+    int collectedTuplesAfterCheckpoint = testMeta.sink.collectedTuples.size();
+    //failure and then re-deployment of operator
+    testMeta.sink.collectedTuples.clear();
+    operator.teardown();
+    operator.deactivate();
+    operator = createOperator(false);
+    operator.offsetStats = offsetStats;
+    operator.setup(testMeta.context);
+    operator.activate(testMeta.context);
+    latch.await(4000, TimeUnit.MILLISECONDS);
+    // Emiting data after all recovery windows are replayed
+    operator.beginWindow(3);
+    operator.emitTuples();
+    operator.endWindow();
+    operator.beginWindow(4);
+    operator.emitTuples();
+    operator.endWindow();
+    latch.await(3000, TimeUnit.MILLISECONDS);
+
+    Assert.assertEquals("Total messages collected ", totalCount - collectedTuplesAfterCheckpoint
+ 1, testMeta.sink.collectedTuples.size());
+    testMeta.sink.collectedTuples.clear();
+    operator.teardown();
+    operator.deactivate();
+  }
+
+  private KafkaSinglePortStringInputOperator createOperator(boolean isIdempotency)
+  {
     Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
     attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
     attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDir);
@@ -339,9 +388,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     consumer.setTopic(TEST_TOPIC);
     consumer.setInitialOffset("earliest");
 
-    FSWindowDataManager storageManager = new FSWindowDataManager();
-    storageManager.setRecoveryPath(testMeta.recoveryDir);
-    testMeta.operator.setWindowDataManager(storageManager);
+    if (isIdempotency) {
+      FSWindowDataManager storageManager = new FSWindowDataManager();
+      storageManager.setRecoveryPath(testMeta.recoveryDir);
+      testMeta.operator.setWindowDataManager(storageManager);
+    }
+
     testMeta.operator.setConsumer(consumer);
     testMeta.operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
     testMeta.operator.setMaxTuplesPerWindow(500);
@@ -356,7 +408,12 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     testMeta.sink = new CollectorTestSink<Object>();
     testMeta.operator.outputPort.setSink(testMeta.sink);
     operator.outputPort.setSink(testMeta.sink);
+    return operator;
+  }
 
+  private KafkaSinglePortStringInputOperator createAndDeployOperator(boolean isIdempotency)
+  {
+    KafkaSinglePortStringInputOperator operator = createOperator(isIdempotency);
     operator.setup(testMeta.context);
     operator.activate(testMeta.context);
 


Mime
View raw message