apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsy...@apache.org
Subject [1/4] apex-malhar git commit: APEXMALHAR-2230 simplify the kafka input operator test
Date Tue, 20 Sep 2016 15:45:44 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 0c4b3fce2 -> 7b2d7e3d9


APEXMALHAR-2230 simplify the kafka input operator test


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5909dfdc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5909dfdc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5909dfdc

Branch: refs/heads/master
Commit: 5909dfdc491fdca0cea7eca56fe72b8e1d32bcc3
Parents: 9f9da0e
Author: Siyuan Hua <hsy541@apache.org>
Authored: Thu Sep 15 08:31:34 2016 -0700
Committer: Siyuan Hua <hsy541@apache.org>
Committed: Thu Sep 15 08:31:34 2016 -0700

----------------------------------------------------------------------
 .../malhar/kafka/KafkaInputOperatorTest.java    | 32 ++------------------
 1 file changed, 3 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5909dfdc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 8440615..47a374b 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -137,10 +137,6 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
   private static List<String> tupleCollection = new LinkedList<>();
 
-  /**
-   * whether countDown latch count all tuples or just END_TUPLE
-   */
-  private static final boolean countDownAll = false;
   private static final int scale = 2;
   private static final int totalCount = 10 * scale;
   private static final int failureTrigger = 3 * scale;
@@ -232,33 +228,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
       int tupleSize = windowTupleCollector.size();
       tupleCollection.addAll(windowTupleCollector);
       
-      int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+      int countDownTupleSize = endTuples;
 
       if (latch != null) {
-        Assert.assertTrue("received END_TUPLES more than expected.", latch.getCount() >=
countDownTupleSize);
-        while (countDownTupleSize > 0) {
+        while (countDownTupleSize-- > 0) {
             latch.countDown();
-            --countDownTupleSize;
-        }
-        if (latch.getCount() == 0) {
-          /**
-           * The time before countDown() and the shutdown() of the application
-           * will cause fatal error:
-           * "Catastrophic Error: Invalid State - the operator blocked forever!"
-           * as the activeQueues could be cleared but alive haven't changed yet.
-           * throw the ShutdownException to let the engine shutdown;
-           */
-          try {
-            throw new ShutdownException();
-            //lc.shutdown();
-          } finally {
-            /**
-             * interrupt the engine thread, let it wake from sleep and handle
-             * the shutdown at this time, all payload should be handled. so it
-             * should be ok to interrupt
-             */
-            monitorThread.interrupt();
-          }
         }
       }
     }
@@ -301,7 +275,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
   {
     // each broker should get a END_TUPLE message
-    latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
+    latch = new CountDownLatch(totalBrokers);
 
     logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster:
{}; hasMultiPartition: {}, partition: {}", 
         testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);



Mime
View raw message