apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject apex-core git commit: APEXCORE-627 : Unit test AtMostOnceTest intermittently fails
Date Thu, 02 Feb 2017 19:07:16 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 1e9896bb4 -> 527c70bf8


APEXCORE-627 : Unit test AtMostOnceTest intermittently fails

Fixed a race problem for calling checkpointed and committed in RecoverableInputOperator. The
original implementation of the class used the method checkpointed() of the interface CheckpointListener
to refresh a value of one of the criteria variables checkpointedWindowId. The fix update uses
the method beforeCheckpoint() of the interface CheckpointNotificationListener. It guaranties
that the update of the variable checkpointedWindowId will be done before the call of the method
committed().


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/527c70bf
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/527c70bf
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/527c70bf

Branch: refs/heads/master
Commit: 527c70bf8b4060d04d713f2b6e0ffb113a17ece6
Parents: 1e9896b
Author: Sergey Golovko <sergey@datatorrent.com>
Authored: Fri Jan 27 15:15:09 2017 -0800
Committer: Sergey Golovko <sergey@datatorrent.com>
Committed: Mon Jan 30 16:20:14 2017 -0800

----------------------------------------------------------------------
 .../stram/engine/RecoverableInputOperator.java  | 44 +++++++++++++-------
 1 file changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/527c70bf/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
index ed95874..6412da1 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java
@@ -26,24 +26,28 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.util.BaseOperator;
 
 /**
  *
  */
-public class RecoverableInputOperator implements InputOperator, com.datatorrent.api.Operator.CheckpointListener
+public class RecoverableInputOperator implements InputOperator, Operator.CheckpointNotificationListener
 {
   public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
   private long checkpointedWindowId;
-  boolean firstRun = true;
-  transient boolean first;
-  transient long windowId;
-  int maximumTuples = 20;
-  boolean simulateFailure;
+  private transient boolean firstRun = true;
+  private transient boolean first;
+  private transient long windowId;
+  private int maximumTuples = 20;
+  private boolean simulateFailure;
 
   private static final Map<Long, Long> idMap = new HashMap<>();
   private static long tuple = 0;
@@ -95,7 +99,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
   public void setup(OperatorContext context)
   {
     firstRun = (checkpointedWindowId == 0);
-    logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId));
+    logger.debug("{}", this);
   }
 
   @Override
@@ -106,18 +110,12 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
   @Override
   public void checkpointed(long windowId)
   {
-    if (checkpointedWindowId == 0) {
-      checkpointedWindowId = windowId;
-      logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId));
-    }
-
-    logger.debug("{} checkpointed at {}", this, Codec.getStringWindowId(windowId));
   }
 
   @Override
   public void committed(long windowId)
   {
-    logger.debug("{} committed at {} firstRun {}, checkpointedWindowId {}", this, Codec.getStringWindowId(windowId),
firstRun, Codec.getStringWindowId(checkpointedWindowId));
+    logger.debug("{}, windowId={}", this, Codec.getStringWindowId(windowId));
     if (simulateFailure && firstRun && checkpointedWindowId > 0 &&
windowId > checkpointedWindowId) {
       throw new RuntimeException("Failure Simulation from " + this);
     }
@@ -134,4 +132,22 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent.
   {
     simulateFailure = flag;
   }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    if (checkpointedWindowId == 0) {
+      checkpointedWindowId = windowId;
+    }
+    logger.debug("{}, windowId={}", this, Codec.getStringWindowId(windowId));
+  }
+
+  @Override
+  public String toString()
+  {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+            .append("firstRun", this.firstRun)
+            .append("checkpointedWindowId", Codec.getStringWindowId(checkpointedWindowId))
+            .toString();
+  }
 }


Mime
View raw message