apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From csi...@apache.org
Subject [1/2] incubator-apex-core git commit: APEXCORE-421 #comment #resolve Fixing double checkpoint bug on InputNode shutdown
Date Fri, 08 Apr 2016 00:29:21 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 47d66b000 -> 381de3a29


APEXCORE-421 #comment #resolve Fixing double checkpoint bug on InputNode shutdown


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f1229d47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f1229d47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f1229d47

Branch: refs/heads/master
Commit: f1229d47f39a14dff32742add5ad85ee8872c6b5
Parents: 47d66b0
Author: Timothy Farkas <tim@datatorrent.com>
Authored: Tue Apr 5 21:52:10 2016 -0700
Committer: Timothy Farkas <tim@datatorrent.com>
Committed: Thu Apr 7 16:06:00 2016 -0700

----------------------------------------------------------------------
 .../com/datatorrent/stram/engine/InputNode.java     | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f1229d47/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
index d52d133..c851d24 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
@@ -241,11 +241,17 @@ public class InputNode extends Node<InputOperator>
       if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
         applicationWindowCount = 0;
       }
-      if (++checkpointWindowCount == CHECKPOINT_WINDOW_COUNT) {
-        checkpointWindowCount = 0;
-        if (doCheckpoint || PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) {
-          checkpoint(currentWindowId);
-          lastCheckpointWindowId = currentWindowId;
+
+      if (lastCheckpointWindowId < currentWindowId) {
+        //This check is here because if the node is shutdown after a checkpoint is completed
for a window,
+        //but before the next window begins a double checkpoint could happen if the CheckpointWindowCount
+        //is 1
+        if (++checkpointWindowCount == CHECKPOINT_WINDOW_COUNT) {
+          checkpointWindowCount = 0;
+          if (doCheckpoint || PROCESSING_MODE == ProcessingMode.EXACTLY_ONCE) {
+            checkpoint(currentWindowId);
+            lastCheckpointWindowId = currentWindowId;
+          }
         }
       }
 


Mime
View raw message