apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rama...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2445 During recovery no need to write to WindowDataManger if window id <= LargestCompleted Window ID
Date Sun, 19 Mar 2017 21:15:36 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master c46398f11 -> 89b29378e


APEXMALHAR-2445 During recovery no need to write to WindowDataManger if window id <= LargestCompleted
Window ID


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8c538a00
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8c538a00
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8c538a00

Branch: refs/heads/master
Commit: 8c538a003e9d26388df633f07f3e9bb9a639736c
Parents: 7f1abca
Author: Sandesh Hegde <sandesh.hegde@gmail.com>
Authored: Wed Mar 15 11:24:08 2017 -0700
Committer: Sandesh Hegde <sandesh.hegde@gmail.com>
Committed: Wed Mar 15 12:37:32 2017 -0700

----------------------------------------------------------------------
 .../malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8c538a00/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index a8e333f..75af448 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -182,7 +182,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends
AbstractKafkaOu
   @Override
   public void endWindow()
   {
-    if (!partialWindowTuples.isEmpty() && windowId > windowDataManager.getLargestCompletedWindow())
{
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+      return;
+    }
+
+    if (!partialWindowTuples.isEmpty()) {
       throw new RuntimeException("Violates Exactly once. Not all the tuples received after
operator reset.");
     }
 


Mime
View raw message