apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/3] incubator-apex-core git commit: APEXCORE-60 Iteration support in Apex Core
Date Sat, 23 Jan 2016 03:53:02 GMT
Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 d0908e4bc -> b3402be5a


APEXCORE-60 Iteration support in Apex Core


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f7e1ccf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f7e1ccf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f7e1ccf1

Branch: refs/heads/devel-3
Commit: f7e1ccf14154eca92b24c5f6b5387fe56c516829
Parents: d0908e4
Author: David Yan <david@datatorrent.com>
Authored: Wed Dec 9 15:52:26 2015 -0800
Committer: David Yan <david@datatorrent.com>
Committed: Fri Jan 22 19:03:46 2016 -0800

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |   7 +
 .../main/java/com/datatorrent/api/Operator.java |  19 +
 .../common/util/DefaultDelayOperator.java       |  75 ++++
 .../datatorrent/stram/StramLocalCluster.java    |  15 +
 .../stram/StreamingContainerManager.java        |  56 ++-
 .../datatorrent/stram/engine/GenericNode.java   | 190 +++++++---
 .../java/com/datatorrent/stram/engine/Node.java |   6 +-
 .../stram/engine/StreamingContainer.java        |   2 +
 .../stram/engine/WindowGenerator.java           |  14 +-
 .../stram/plan/logical/LogicalPlan.java         |  53 +++
 .../stram/plan/physical/PTOperator.java         |   4 +-
 .../stram/plan/physical/PhysicalPlan.java       |  19 +-
 .../stram/plan/physical/StreamMapping.java      |   4 +-
 .../java/com/datatorrent/stram/tuple/Tuple.java |   5 +
 .../stram/debug/TupleRecorderTest.java          | 208 +++++-----
 .../stram/engine/GenericNodeTest.java           |  18 +-
 .../stram/engine/GenericTestOperator.java       |   3 +
 .../stram/plan/logical/DelayOperatorTest.java   | 377 +++++++++++++++++++
 18 files changed, 888 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index ceed8a2..58bc552 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -166,6 +166,13 @@ public interface Context
      */
     Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
 
+    /**
+     * Attribute of input port.
+     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
+     * This is for iterative processing.
+     */
+    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
+
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Operator.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index 785c60b..d4a6a90 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -99,6 +99,25 @@ public interface Operator extends Component<OperatorContext>
   }
 
   /**
+   * DelayOperator is an operator of which the outgoing streaming window id is incremented by *one* by the
+   * engine, thus allowing loops in the "DAG". The output ports of a DelayOperator, if connected, *must*
+   * immediately connect to an upstream operator in the data flow path. Note that at least one output port of
+   * DelayOperator should be connected in order for the DelayOperator to serve its purpose.
+   *
+   * This is meant for iterative algorithms in the topology. A larger window increment can be simulated by an
+   * implementation of this interface.
+   */
+  interface DelayOperator extends Operator
+  {
+    /**
+     * This method gets called at the first window of the execution.
+     * The implementation is expected to emit tuples for initialization and/or
+     * recovery.
+     */
+    void firstWindow();
+  }
+
+  /**
    * A operator provides ports as a means to consume and produce data tuples.
    * Concrete ports implement derived interfaces.
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
new file mode 100644
index 0000000..ff676d4
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * DefaultDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
+ * port, and does a simple pass-through from the input port to the output port, while recording the tuples in memory
+ * as checkpoint state.  Subclass of this operator can override this behavior by overriding processTuple(T tuple).
+ *
+ * Note that the engine automatically does a +1 on the output window ID since it is a DelayOperator.
+ *
+ * This DelayOperator provides no data loss during recovery, but it incurs a run-time cost per tuple, and all tuples
+ * of the checkpoint window will be part of the checkpoint state.
+ */
+public class DefaultDelayOperator<T> extends BaseOperator implements Operator.DelayOperator
+{
+  public transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+
+  protected List<T> lastWindowTuples = new ArrayList<>();
+
+  protected void processTuple(T tuple)
+  {
+    lastWindowTuples.add(tuple);
+    output.emit(tuple);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    lastWindowTuples.clear();
+  }
+
+  @Override
+  public void firstWindow()
+  {
+    for (T tuple : lastWindowTuples) {
+      output.emit(tuple);
+    }
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 29e8e03..cda2a38 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -27,6 +27,7 @@ import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -80,6 +81,7 @@ public class StramLocalCluster implements Runnable, Controller
   private boolean appDone = false;
   private final Map<String, StreamingContainer> injectShutdown = new ConcurrentHashMap<String, StreamingContainer>();
   private boolean heartbeatMonitoringEnabled = true;
+  private Callable<Boolean> exitCondition;
 
   public interface MockComponentFactory
   {
@@ -427,6 +429,11 @@ public class StramLocalCluster implements Runnable, Controller
     this.perContainerBufferServer = perContainerBufferServer;
   }
 
+  public void setExitCondition(Callable<Boolean> exitCondition)
+  {
+    this.exitCondition = exitCondition;
+  }
+
   @Override
   public void run()
   {
@@ -476,6 +483,14 @@ public class StramLocalCluster implements Runnable, Controller
         appDone = true;
       }
 
+      try {
+        if (exitCondition != null && exitCondition.call()) {
+          appDone = true;
+        }
+      } catch (Exception ex) {
+        break;
+      }
+
       if (Thread.interrupted()) {
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 4b79589..6233697 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1008,7 +1008,7 @@ public class StreamingContainerManager implements PlanContext
       return operatorStatus.latencyMA.getAvg();
     }
     for (PTOperator.PTInput input : maxOperator.getInputs()) {
-      if (null != input.source.source) {
+      if (null != input.source.source && !input.delay) {
         operators.add(input.source.source);
       }
     }
@@ -1896,6 +1896,19 @@ public class StreamingContainerManager implements PlanContext
 
   }
 
+  private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx)
+  {
+    ctx.visited.add(operator);
+    for (PTOperator.PTOutput out : operator.getOutputs()) {
+      for (PTOperator.PTInput sink : out.sinks) {
+        PTOperator sinkOperator = sink.target;
+        if (!ctx.visited.contains(sinkOperator)) {
+          addVisited(sinkOperator, ctx);
+        }
+      }
+    }
+  }
+
   /**
    * Compute checkpoints required for a given operator instance to be recovered.
    * This is done by looking at checkpoints available for downstream dependencies first,
@@ -1913,6 +1926,9 @@ public class StreamingContainerManager implements PlanContext
     if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
       // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
       if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
+        LOG.debug("Marking operator {} blocked committed window {}, recovery window {}", operator,
+            Codec.getStringWindowId(ctx.committedWindowId.longValue()),
+            Codec.getStringWindowId(operator.getRecoveryCheckpoint().windowId));
         ctx.blocked.add(operator);
       }
     }
@@ -1922,25 +1938,30 @@ public class StreamingContainerManager implements PlanContext
       long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
       maxCheckpoint = currentWindowId;
     }
+    ctx.visited.add(operator);
 
     // DFS downstream operators
-    for (PTOperator.PTOutput out : operator.getOutputs()) {
-      for (PTOperator.PTInput sink : out.sinks) {
-        PTOperator sinkOperator = sink.target;
-        if (!ctx.visited.contains(sinkOperator)) {
-          // downstream traversal
-          updateRecoveryCheckpoints(sinkOperator, ctx);
-        }
-        // recovery window id cannot move backwards
-        // when dynamically adding new operators
-        if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
-          maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
-        }
+    if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+      addVisited(operator, ctx);
+    } else {
+      for (PTOperator.PTOutput out : operator.getOutputs()) {
+        for (PTOperator.PTInput sink : out.sinks) {
+          PTOperator sinkOperator = sink.target;
+          if (!ctx.visited.contains(sinkOperator)) {
+            // downstream traversal
+            updateRecoveryCheckpoints(sinkOperator, ctx);
+          }
+          // recovery window id cannot move backwards
+          // when dynamically adding new operators
+          if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
+            maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
+          }
 
-        if (ctx.blocked.contains(sinkOperator)) {
-          if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) {
-            // downstream operator is blocked by this operator
-            ctx.blocked.remove(sinkOperator);
+          if (ctx.blocked.contains(sinkOperator)) {
+            if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) {
+              // downstream operator is blocked by this operator
+              ctx.blocked.remove(sinkOperator);
+            }
           }
         }
       }
@@ -1975,7 +1996,6 @@ public class StreamingContainerManager implements PlanContext
       LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState());
     }
 
-    ctx.visited.add(operator);
   }
 
   public long windowIdToMillis(long windowId)

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 93cee49..4777f93 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -34,11 +34,14 @@ import com.datatorrent.api.Operator.ShutdownException;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.annotation.Stateless;
 
+import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.netlet.util.CircularBuffer;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
 import com.datatorrent.stram.debug.TappedReservoir;
+import com.datatorrent.stram.plan.logical.Operators;
+import com.datatorrent.stram.tuple.ResetWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 /**
@@ -198,6 +201,15 @@ public class GenericNode extends Node<Operator>
     insideWindow = applicationWindowCount != 0;
   }
 
+  private boolean isInputPortConnectedToDelayOperator(String portName)
+  {
+    Operators.PortContextPair<InputPort<?>> pcPair = descriptor.inputPorts.get(portName);
+    if (pcPair == null || pcPair.context == null) {
+      return false;
+    }
+    return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
+  }
+
   /**
    * Originally this method was defined in an attempt to implement the interface Runnable.
    *
@@ -212,30 +224,67 @@ public class GenericNode extends Node<Operator>
     long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
     final boolean handleIdleTime = operator instanceof IdleTimeHandler;
     int totalQueues = inputs.size();
+    int regularQueues = totalQueues;
+    // regularQueues is the number of queues that are not connected to a DelayOperator
+    for (String portName : inputs.keySet()) {
+      if (isInputPortConnectedToDelayOperator(portName)) {
+        regularQueues--;
+      }
+    }
 
-    ArrayList<SweepableReservoir> activeQueues = new ArrayList<SweepableReservoir>();
-    activeQueues.addAll(inputs.values());
+    ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<>();
+    activeQueues.addAll(inputs.entrySet());
 
     int expectingBeginWindow = activeQueues.size();
     int receivedEndWindow = 0;
+    long firstWindowId = -1;
 
     TupleTracker tracker;
     LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>();
-
     try {
       do {
-        Iterator<SweepableReservoir> buffers = activeQueues.iterator();
+        Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator();
   activequeue:
         while (buffers.hasNext()) {
-          SweepableReservoir activePort = buffers.next();
+          Map.Entry<String, SweepableReservoir> activePortEntry = buffers.next();
+          SweepableReservoir activePort = activePortEntry.getValue();
           Tuple t = activePort.sweep();
           if (t != null) {
+            boolean delay = (operator instanceof Operator.DelayOperator);
+            long windowAhead = 0;
+            if (delay) {
+              windowAhead = WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, windowWidthMillis, 1);
+            }
             switch (t.getType()) {
               case BEGIN_WINDOW:
                 if (expectingBeginWindow == totalQueues) {
+                  // This is the first begin window tuple among all ports
+                  if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
+                    // We need to wait for the first BEGIN_WINDOW from a port not connected to DelayOperator before
+                    // we can do anything with it, because otherwise if a CHECKPOINT tuple arrives from
+                    // upstream after the BEGIN_WINDOW tuple for the next window from the delay operator, it would end
+                    // up checkpointing in the middle of the window.  This code is assuming we have at least one
+                    // input port that is not connected to a DelayOperator, and we might have to change this later.
+                    // In the future, this condition will not be needed if we get rid of the CHECKPOINT tuple.
+                    continue;
+                  }
                   activePort.remove();
                   expectingBeginWindow--;
+                  receivedEndWindow = 0;
                   currentWindowId = t.getWindowId();
+                  if (delay) {
+                    if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > t.getBaseSeconds()) {
+                      // Buffer server code strips out the base seconds from BEGIN_WINDOW and END_WINDOW tuples for
+                      // serialization optimization.  That's why we need a reset window here to tell the buffer
+                      // server we are having a new baseSeconds now.
+                      Tuple resetWindowTuple = new ResetWindowTuple(windowAhead);
+                      for (int s = sinks.length; s-- > 0; ) {
+                        sinks[s].put(resetWindowTuple);
+                      }
+                      controlTupleCount++;
+                    }
+                    t.setWindowId(windowAhead);
+                  }
                   for (int s = sinks.length; s-- > 0; ) {
                     sinks[s].put(t);
                   }
@@ -245,7 +294,6 @@ public class GenericNode extends Node<Operator>
                     insideWindow = true;
                     operator.beginWindow(currentWindowId);
                   }
-                  receivedEndWindow = 0;
                 }
                 else if (t.getWindowId() == currentWindowId) {
                   activePort.remove();
@@ -253,17 +301,7 @@ public class GenericNode extends Node<Operator>
                 }
                 else {
                   buffers.remove();
-
-                  /* find the name of the port which got out of sequence tuple */
-                  String port = null;
-                  for (Entry<String, SweepableReservoir> e : inputs.entrySet()) {
-                    if (e.getValue() == activePort) {
-                      port = e.getKey();
-                    }
-                  }
-
-                  assert (port != null); /* we should always find the port */
-
+                  String port = activePortEntry.getKey();
                   if (PROCESSING_MODE == ProcessingMode.AT_MOST_ONCE) {
                     if (t.getWindowId() < currentWindowId) {
                       /*
@@ -279,21 +317,21 @@ public class GenericNode extends Node<Operator>
                       WindowIdActivatedReservoir wiar = new WindowIdActivatedReservoir(port, activePort, currentWindowId);
                       wiar.setSink(sink);
                       inputs.put(port, wiar);
-                      activeQueues.add(wiar);
+                      activeQueues.add(new AbstractMap.SimpleEntry<String, SweepableReservoir>(port, wiar));
                       break activequeue;
                     }
                     else {
                       expectingBeginWindow--;
                       if (++receivedEndWindow == totalQueues) {
                         processEndWindow(null);
-                        activeQueues.addAll(inputs.values());
+                        activeQueues.addAll(inputs.entrySet());
                         expectingBeginWindow = activeQueues.size();
                         break activequeue;
                       }
                     }
                   }
                   else {
-                    logger.error("Catastrophic Error: Out of sequence tuple {} on port {} while expecting {}", Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId));
+                    logger.error("Catastrophic Error: Out of sequence {} tuple {} on port {} while expecting {}", t.getType(), Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId));
                     System.exit(2);
                   }
                 }
@@ -306,8 +344,11 @@ public class GenericNode extends Node<Operator>
                   endWindowDequeueTimes.put(activePort, System.currentTimeMillis());
                   if (++receivedEndWindow == totalQueues) {
                     assert (activeQueues.isEmpty());
+                    if (delay) {
+                      t.setWindowId(windowAhead);
+                    }
                     processEndWindow(t);
-                    activeQueues.addAll(inputs.values());
+                    activeQueues.addAll(inputs.entrySet());
                     expectingBeginWindow = activeQueues.size();
                     break activequeue;
                   }
@@ -330,11 +371,12 @@ public class GenericNode extends Node<Operator>
                       doCheckpoint = true;
                     }
                   }
-
-                  for (int s = sinks.length; s-- > 0; ) {
-                    sinks[s].put(t);
+                  if (!delay) {
+                    for (int s = sinks.length; s-- > 0; ) {
+                      sinks[s].put(t);
+                    }
+                    controlTupleCount++;
                   }
-                  controlTupleCount++;
                 }
                 break;
 
@@ -343,12 +385,14 @@ public class GenericNode extends Node<Operator>
                  * we will receive tuples which are equal to the number of input streams.
                  */
                 activePort.remove();
-                buffers.remove();
+                if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
+                  break; // breaking out of the switch/case
+                }
 
+                buffers.remove();
                 int baseSeconds = t.getBaseSeconds();
                 tracker = null;
-                Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
-                while (trackerIterator.hasNext()) {
+                for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
                   tracker = trackerIterator.next();
                   if (tracker.tuple.getBaseSeconds() == baseSeconds) {
                     break;
@@ -356,7 +400,7 @@ public class GenericNode extends Node<Operator>
                 }
 
                 if (tracker == null) {
-                  tracker = new TupleTracker(t, totalQueues);
+                  tracker = new TupleTracker(t, regularQueues);
                   resetTupleTracker.add(tracker);
                 }
                 int trackerIndex = 0;
@@ -364,29 +408,50 @@ public class GenericNode extends Node<Operator>
                   if (tracker.ports[trackerIndex] == null) {
                     tracker.ports[trackerIndex++] = activePort;
                     break;
-                  }
-                  else if (tracker.ports[trackerIndex] == activePort) {
+                  } else if (tracker.ports[trackerIndex] == activePort) {
                     break;
                   }
 
                   trackerIndex++;
                 }
 
-                if (trackerIndex == totalQueues) {
-                  trackerIterator = resetTupleTracker.iterator();
+                if (trackerIndex == regularQueues) {
+                  Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
                   while (trackerIterator.hasNext()) {
                     if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
                       trackerIterator.remove();
                     }
                   }
-                  for (int s = sinks.length; s-- > 0; ) {
-                    sinks[s].put(t);
+                  if (!delay) {
+                    for (int s = sinks.length; s-- > 0; ) {
+                      sinks[s].put(t);
+                    }
+                    controlTupleCount++;
                   }
-                  controlTupleCount++;
-
-                  assert (activeQueues.isEmpty());
-                  activeQueues.addAll(inputs.values());
+                  if (!activeQueues.isEmpty()) {
+                    // make sure they are all queues from DelayOperator
+                    for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
+                      if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
+                        assert (false);
+                      }
+                    }
+                    activeQueues.clear();
+                  }
+                  activeQueues.addAll(inputs.entrySet());
                   expectingBeginWindow = activeQueues.size();
+
+                  if (firstWindowId == -1) {
+                    if (delay) {
+                      for (int s = sinks.length; s-- > 0; ) {
+                        sinks[s].put(t);
+                      }
+                      controlTupleCount++;
+                      // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
+                      // (recovery), fabricate the first window
+                      fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
+                    }
+                    firstWindowId = t.getWindowId();
+                  }
                   break activequeue;
                 }
                 break;
@@ -394,6 +459,15 @@ public class GenericNode extends Node<Operator>
               case END_STREAM:
                 activePort.remove();
                 buffers.remove();
+                if (firstWindowId == -1) {
+                  // this is for recovery from a checkpoint for DelayOperator
+                  if (delay) {
+                    // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM (recovery),
+                    // fabricate the first window
+                    fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
+                  }
+                  firstWindowId = t.getWindowId();
+                }
                 for (Iterator<Entry<String, SweepableReservoir>> it = inputs.entrySet().iterator(); it.hasNext(); ) {
                   Entry<String, SweepableReservoir> e = it.next();
                   if (e.getValue() == activePort) {
@@ -409,7 +483,7 @@ public class GenericNode extends Node<Operator>
                       if (e.getKey().equals(dic.portname)) {
                         connectInputPort(dic.portname, dic.reservoir);
                         dici.remove();
-                        activeQueues.add(dic.reservoir);
+                        activeQueues.add(new AbstractMap.SimpleEntry<>(dic.portname, dic.reservoir));
                         break activequeue;
                       }
                     }
@@ -427,17 +501,18 @@ public class GenericNode extends Node<Operator>
                  * Since one of the operators we care about it gone, we should relook at our ports.
                  * We need to make sure that the END_STREAM comes outside of the window.
                  */
+                regularQueues--;
                 totalQueues--;
 
                 boolean break_activequeue = false;
-                if (totalQueues == 0) {
+                if (regularQueues == 0) {
                   alive = false;
                   break_activequeue = true;
                 }
                 else if (activeQueues.isEmpty()) {
                   assert (!inputs.isEmpty());
                   processEndWindow(null);
-                  activeQueues.addAll(inputs.values());
+                  activeQueues.addAll(inputs.entrySet());
                   expectingBeginWindow = activeQueues.size();
                   break_activequeue = true;
                 }
@@ -450,22 +525,22 @@ public class GenericNode extends Node<Operator>
                  * it's the only one which has not, then we consider it delivered and release the reset tuple downstream.
                  */
                 Tuple tuple = null;
-                for (trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
+                for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
                   tracker = trackerIterator.next();
 
                   trackerIndex = 0;
                   while (trackerIndex < tracker.ports.length) {
                     if (tracker.ports[trackerIndex] == activePort) {
-                      SweepableReservoir[] ports = new SweepableReservoir[totalQueues];
+                      SweepableReservoir[] ports = new SweepableReservoir[regularQueues];
                       System.arraycopy(tracker.ports, 0, ports, 0, trackerIndex);
-                      if (trackerIndex < totalQueues) {
+                      if (trackerIndex < regularQueues) {
                         System.arraycopy(tracker.ports, trackerIndex + 1, ports, trackerIndex, tracker.ports.length - trackerIndex - 1);
                       }
                       tracker.ports = ports;
                       break;
                     }
                     else if (tracker.ports[trackerIndex] == null) {
-                      if (trackerIndex == totalQueues) { /* totalQueues is already adjusted above */
+                      if (trackerIndex == regularQueues) { /* regularQueues is already adjusted above */
                         if (tuple == null || tuple.getBaseSeconds() < tracker.tuple.getBaseSeconds()) {
                           tuple = tracker.tuple;
                         }
@@ -475,7 +550,7 @@ public class GenericNode extends Node<Operator>
                       break;
                     }
                     else {
-                      tracker.ports = Arrays.copyOf(tracker.ports, totalQueues);
+                      tracker.ports = Arrays.copyOf(tracker.ports, regularQueues);
                     }
 
                     trackerIndex++;
@@ -485,7 +560,7 @@ public class GenericNode extends Node<Operator>
                 /*
                  * Since we were waiting for a reset tuple on this stream, we should not any longer.
                  */
-                if (tuple != null) {
+                if (tuple != null && !delay) {
                   for (int s = sinks.length; s-- > 0; ) {
                     sinks[s].put(tuple);
                   }
@@ -509,8 +584,8 @@ public class GenericNode extends Node<Operator>
         }
         else {
           boolean need2sleep = true;
-          for (SweepableReservoir cb : activeQueues) {
-            if (cb.size() > 0) {
+          for (Map.Entry<String, SweepableReservoir> cb : activeQueues) {
+            if (cb.getValue().size() > 0) {
               need2sleep = false;
               break;
             }
@@ -582,6 +657,21 @@ public class GenericNode extends Node<Operator>
 
   }
 
+  private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead)
+  {
+    Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead);
+    Tuple endWindowTuple = new Tuple(MessageType.END_WINDOW, windowAhead);
+    for (Sink<Object> sink : outputs.values()) {
+      sink.put(beginWindowTuple);
+    }
+    controlTupleCount++;
+    delayOperator.firstWindow();
+    for (Sink<Object> sink : outputs.values()) {
+      sink.put(endWindowTuple);
+    }
+    controlTupleCount++;
+  }
+
   /**
    * End window dequeue times may not have been saved for all the input ports during deactivate,
    * so save them for reporting. SPOI-1324.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 068a325..d4970cd 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -126,6 +126,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
   private ExecutorService executorService;
   private Queue<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>> taskQueue;
   protected Stats.CheckpointStats checkpointStats;
+  public long firstWindowMillis;
+  public long windowWidthMillis;
 
   public Node(OPERATOR operator, OperatorContext context)
   {
@@ -354,7 +356,9 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
 
   protected void emitEndWindow()
   {
-    EndWindowTuple ewt = new EndWindowTuple(currentWindowId);
+    long windowId = (operator instanceof Operator.DelayOperator) ?
+        WindowGenerator.getAheadWindowId(currentWindowId, firstWindowMillis, windowWidthMillis, 1) : currentWindowId;
+    EndWindowTuple ewt = new EndWindowTuple(windowId);
     for (int s = sinks.length; s-- > 0; ) {
       sinks[s].put(ewt);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 14e00a9..79d9037 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -894,6 +894,8 @@ public class StreamingContainer extends YarnContainerMain
       Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
       node.currentWindowId = ndi.checkpoint.windowId;
       node.applicationWindowCount = ndi.checkpoint.applicationWindowCount;
+      node.firstWindowMillis = firstWindowMillis;
+      node.windowWidthMillis = windowWidthMillis;
 
       node.setId(ndi.id);
       nodes.put(ndi.id, node);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 5610112..ea429af 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -314,13 +314,25 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
     long baseMillis = (windowId >> 32) * 1000;
     long diff = baseMillis - firstWindowMillis;
     long baseChangeInterval = windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1);
+    assert (baseChangeInterval > 0);
     long multiplier = diff / baseChangeInterval;
     if (diff % baseChangeInterval > 0) {
       multiplier++;
     }
     assert (multiplier >= 0);
     windowId = windowId & WindowGenerator.WINDOW_MASK;
-    return firstWindowMillis + (multiplier * windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1)) + windowId * windowWidthMillis;
+    return firstWindowMillis + (multiplier * baseChangeInterval) + (windowId * windowWidthMillis);
+  }
+
+  /**
+   * Utility function to get the base seconds from a window id
+   *
+   * @param windowId
+   * @return the base seconds for the given window id
+   */
+  public static long getBaseSecondsFromWindowId(long windowId)
+  {
+    return windowId >>> 32;
   }
 
   private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 867f814..3c26118 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1758,6 +1758,14 @@ public class LogicalPlan implements Serializable, DAG
       throw new ValidationException("Loops in graph: " + cycles);
     }
 
+    List<List<String>> invalidDelays = new ArrayList<>();
+    for (OperatorMeta n : rootOperators) {
+      findInvalidDelays(n, invalidDelays);
+    }
+    if (!invalidDelays.isEmpty()) {
+      throw new ValidationException("Invalid delays in graph: " + invalidDelays);
+    }
+
     for (StreamMeta s: streams.values()) {
       if (s.source == null) {
         throw new ValidationException("Stream source not connected: " + s.getName());
@@ -1814,6 +1822,11 @@ public class LogicalPlan implements Serializable, DAG
       return;
     }
 
+    if (om.getOperator() instanceof Operator.DelayOperator) {
+      String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
+      throw new ValidationException(msg);
+    }
+
     for (StreamMeta sm: om.inputStreams.values()){
       // validation fail as each input stream should be OIO
       if (sm.locality != Locality.THREAD_LOCAL){
@@ -1822,6 +1835,10 @@ public class LogicalPlan implements Serializable, DAG
         throw new ValidationException(msg);
       }
 
+      if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) {
+        String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, sm.source.operatorMeta);
+        throw new ValidationException(msg);
+      }
       // gets oio root for input operator for the stream
       Integer oioStreamRoot = getOioRoot(sm.source.operatorMeta);
 
@@ -1895,6 +1912,11 @@ public class LogicalPlan implements Serializable, DAG
     // depth first successors traversal
     for (StreamMeta downStream: om.outputStreams.values()) {
       for (InputPortMeta sink: downStream.sinks) {
+        if (om.getOperator() instanceof Operator.DelayOperator) {
+          // this is an iteration loop, do not treat it as downstream when detecting cycles
+          sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true);
+          continue;
+        }
         OperatorMeta successor = sink.getOperatorWrapper();
         if (successor == null) {
           continue;
@@ -1932,6 +1954,37 @@ public class LogicalPlan implements Serializable, DAG
     }
   }
 
+  public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
+  {
+    stack.push(om);
+
+    // depth first successors traversal
+    boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
+    if (isDelayOperator) {
+      if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
+        LOG.debug("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
+        invalidDelays.add(Collections.singletonList(om.getName()));
+      }
+    }
+
+    for (StreamMeta downStream: om.outputStreams.values()) {
+      for (InputPortMeta sink : downStream.sinks) {
+        OperatorMeta successor = sink.getOperatorWrapper();
+        if (isDelayOperator) {
+          // Check whether all downstream operators are already visited in the path
+          if (successor != null && !stack.contains(successor)) {
+            LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
+                om.getName(), downStream.getSource().getPortName(), successor.getName(), sink.getPortName());
+            invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
+          }
+        } else {
+          findInvalidDelays(successor, invalidDelays);
+        }
+      }
+    }
+    stack.pop();
+  }
+
   private void validateProcessingMode(OperatorMeta om, Set<OperatorMeta> visited)
   {
     for (StreamMeta is : om.getInputStreams().values()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
index 6adfd64..ae276d8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
@@ -81,6 +81,7 @@ public class PTOperator implements java.io.Serializable
     public final PartitionKeys partitions;
     public final PTOutput source;
     public final String portName;
+    public final boolean delay;
 
     /**
      *
@@ -90,7 +91,7 @@ public class PTOperator implements java.io.Serializable
      * @param partitions
      * @param source
      */
-    protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source)
+    protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source, boolean delay)
     {
       this.logicalStream = logicalStream;
       this.target = target;
@@ -98,6 +99,7 @@ public class PTOperator implements java.io.Serializable
       this.source = source;
       this.portName = portName;
       this.source.sinks.add(this);
+      this.delay = delay;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 829a6fd..da96ef3 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -328,8 +328,11 @@ public class PhysicalPlan implements Serializable
 
       boolean upstreamDeployed = true;
 
-      for (StreamMeta s : n.getInputStreams().values()) {
-        if (s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
+      for (Map.Entry<InputPortMeta, StreamMeta> entry : n.getInputStreams().entrySet()) {
+        StreamMeta s = entry.getValue();
+        boolean delay = entry.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
+        // skip delay sources since it's going to be handled as downstream
+        if (!delay && s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
           pendingNodes.push(n);
           pendingNodes.push(s.getSource().getOperatorMeta());
           upstreamDeployed = false;
@@ -907,7 +910,10 @@ public class PhysicalPlan implements Serializable
 
     for (Map.Entry<InputPortMeta, StreamMeta> ipm : m.logicalOperator.getInputStreams().entrySet()) {
       PMapping sourceMapping = this.logicalToPTOperator.get(ipm.getValue().getSource().getOperatorMeta());
-
+      if (ipm.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+        // skip if the source is a DelayOperator
+        continue;
+      }
       if (ipm.getKey().getValue(PortContext.PARTITION_PARALLEL)) {
         if (sourceMapping.partitions.size() < m.partitions.size()) {
           throw new AssertionError("Number of partitions don't match in parallel mapping " + sourceMapping.logicalOperator.getName() + " -> " + m.logicalOperator.getName() + ", " + sourceMapping.partitions.size() + " -> " + m.partitions.size());
@@ -942,11 +948,11 @@ public class PhysicalPlan implements Serializable
                 PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
                   sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                 StreamMapping.addInput(slidingUnifier, sourceOut, null);
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0));
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
                 sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
               }
               else {
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut);
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
               }
               oper.inputs.add(input);
             }
@@ -1445,6 +1451,9 @@ public class PhysicalPlan implements Serializable
     PMapping upstreamPartitioned = null;
 
     for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> e : om.getInputStreams().entrySet()) {
+      if (e.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+        continue;
+      }
       PMapping m = logicalToPTOperator.get(e.getValue().getSource().getOperatorMeta());
       if (e.getKey().getValue(PortContext.PARTITION_PARALLEL).equals(true)) {
         // operator partitioned with upstream

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index d42c327..91c6eef 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable
     // link to upstream output(s) for this stream
     for (PTOutput upstreamOut : sourceOper.outputs) {
       if (upstreamOut.logicalStream == streamMeta) {
-        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut);
+        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
         oper.inputs.add(input);
       }
     }
@@ -356,7 +356,7 @@ public class StreamMapping implements java.io.Serializable
   public static void addInput(PTOperator target, PTOutput upstreamOut, PartitionKeys pks)
   {
     StreamMeta lStreamMeta = upstreamOut.logicalStream;
-    PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut);
+    PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut, false);
     target.inputs.add(input);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
index 23c197b..9191b65 100644
--- a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
+++ b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
@@ -52,6 +52,11 @@ public class Tuple
     return windowId;
   }
 
+  public void setWindowId(long windowId)
+  {
+    this.windowId = windowId;
+  }
+
   public final int getBaseSeconds()
   {
     return (int)(windowId >> 32);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 3f97b54..1c17d68 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -76,8 +76,7 @@ public class TupleRecorderTest
   public TupleRecorder getTupleRecorder(final StramLocalCluster localCluster, final PTOperator op)
   {
     TupleRecorderCollection instance = (TupleRecorderCollection)localCluster.getContainer(op).getInstance(classname);
-    TupleRecorder tupleRecorder = instance.getTupleRecorder(op.getId(), null);
-    return tupleRecorder;
+    return instance.getTupleRecorder(op.getId(), null);
   }
 
   public class Tuple
@@ -89,8 +88,7 @@ public class TupleRecorderTest
   @Test
   public void testRecorder() throws IOException
   {
-    FileSystem fs = new LocalFileSystem();
-    try {
+    try (FileSystem fs = new LocalFileSystem()) {
       TupleRecorder recorder = new TupleRecorder(null, "application_test_id_1");
       recorder.getStorage().setBytesPerPartFile(4096);
       recorder.getStorage().setLocalMode(true);
@@ -132,80 +130,76 @@ public class TupleRecorderTest
 
       fs.initialize((new Path(recorder.getStorage().getBasePath()).toUri()), new Configuration());
       Path path;
-      FSDataInputStream is;
       String line;
-      BufferedReader br;
 
       path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.INDEX_FILE);
-      is = fs.open(path);
-      br = new BufferedReader(new InputStreamReader(is));
-
-      line = br.readLine();
-      //    Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line);
-      Assert.assertTrue("check index", line.matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
+      try (FSDataInputStream is = fs.open(path);
+          BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
 
+        line = br.readLine();
+        //    Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line);
+        Assert.assertTrue("check index", line
+            .matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
+      }
       path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.META_FILE);
-      is = fs.open(path);
-      br = new BufferedReader(new InputStreamReader(is));
-
-      ObjectMapper mapper = new ObjectMapper();
-      line = br.readLine();
-      Assert.assertEquals("check version", "1.2", line);
-      br.readLine(); // RecordInfo
-      //RecordInfo ri = mapper.readValue(line, RecordInfo.class);
-      line = br.readLine();
-      PortInfo pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      line = br.readLine();
-      pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      line = br.readLine();
-      pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      line = br.readLine();
-      pi = mapper.readValue(line, PortInfo.class);
-      Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id);
-      Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type);
-      Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size());
-      //line = br.readLine();
-
+      try (FSDataInputStream is = fs.open(path);
+          BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+
+        ObjectMapper mapper = new ObjectMapper();
+        line = br.readLine();
+        Assert.assertEquals("check version", "1.2", line);
+        br.readLine(); // RecordInfo
+        //RecordInfo ri = mapper.readValue(line, RecordInfo.class);
+        line = br.readLine();
+        PortInfo pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        line = br.readLine();
+        pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        line = br.readLine();
+        pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        line = br.readLine();
+        pi = mapper.readValue(line, PortInfo.class);
+        Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+        Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+        Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size());
+        //line = br.readLine();
+      }
       path = new Path(recorder.getStorage().getBasePath(), "part0.txt");
-      is = fs.open(path);
-      br = new BufferedReader(new InputStreamReader(is));
+      try (FSDataInputStream is = fs.open(path);
+          BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
 
-      line = br.readLine();
-      Assert.assertTrue("check part0", line.startsWith("B:"));
-      Assert.assertTrue("check part0", line.endsWith(":1000"));
+        line = br.readLine();
+        Assert.assertTrue("check part0", line.startsWith("B:"));
+        Assert.assertTrue("check part0", line.endsWith(":1000"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 1", line.startsWith("T:"));
-      Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 1", line.startsWith("T:"));
+        Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 2", line.startsWith("T:"));
-      Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 2", line.startsWith("T:"));
+        Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 3", line.startsWith("T:"));
-      Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 3", line.startsWith("T:"));
+        Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 4", line.startsWith("T:"));
-      Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
+        line = br.readLine();
+        Assert.assertTrue("check part0 4", line.startsWith("T:"));
+        Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
 
-      line = br.readLine();
-      Assert.assertTrue("check part0 5", line.startsWith("E:"));
-      Assert.assertTrue("check part0 5", line.endsWith(":1000"));
-    }
-    catch (IOException ex) {
+        line = br.readLine();
+        Assert.assertTrue("check part0 5", line.startsWith("E:"));
+        Assert.assertTrue("check part0 5", line.endsWith(":1000"));
+      }
+    } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
-    finally {
-      fs.close();
-    }
   }
 
   private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
@@ -234,17 +228,17 @@ public class TupleRecorderTest
     final PTOperator ptOp2 = localCluster.findByLogicalNode(dag.getMeta(op2));
     StramTestSupport.waitForActivation(localCluster, ptOp2);
 
-    testRecordingOnOperator(localCluster, ptOp2, 2);
+    testRecordingOnOperator(localCluster, ptOp2);
 
     final PTOperator ptOp1 = localCluster.findByLogicalNode(dag.getMeta(op1));
     StramTestSupport.waitForActivation(localCluster, ptOp1);
 
-    testRecordingOnOperator(localCluster, ptOp1, 1);
+    testRecordingOnOperator(localCluster, ptOp1);
 
     localCluster.shutdown();
   }
 
-  private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op, int numPorts) throws Exception
+  private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op) throws Exception
   {
     String id = "xyz";
     localCluster.getStreamingContainerManager().startRecording(id, op.getId(), null, 0);
@@ -259,25 +253,30 @@ public class TupleRecorderTest
 
     };
     Assert.assertTrue("Should get a tuple recorder within 10 seconds", StramTestSupport.awaitCompletion(c, 10000));
-    TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
+    final TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
     long startTime = tupleRecorder.getStartTime();
-    BufferedReader br;
     String line;
     File dir = new File(testWorkDir, "recordings/" + op.getId() + "/" + id);
     File file;
 
-    file = new File(dir, "meta.txt");
+    file = new File(dir, FSPartFileCollection.META_FILE);
     Assert.assertTrue("meta file should exist", file.exists());
-    br = new BufferedReader(new FileReader(file));
-    line = br.readLine();
-    Assert.assertEquals("version should be 1.2", "1.2", line);
-    line = br.readLine();
-    JSONObject json = new JSONObject(line);
-    Assert.assertEquals("Start time verification", startTime, json.getLong("startTime"));
-    
-    for (int i = 0; i < numPorts; i++) {
+    int numPorts = tupleRecorder.getSinkMap().size();
+
+    try (BufferedReader br = new BufferedReader(new FileReader(file))) {
       line = br.readLine();
-      Assert.assertTrue("should contain name, streamName, type and id", line != null && line.contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line.contains("\"id\""));
+      Assert.assertEquals("version should be 1.2", "1.2", line);
+      line = br.readLine();
+      JSONObject json = new JSONObject(line);
+      Assert.assertEquals("Start time verification", startTime, json.getLong("startTime"));
+      Assert.assertTrue(numPorts > 0);
+
+      for (int i = 0; i < numPorts; i++) {
+        line = br.readLine();
+        Assert.assertTrue("should contain name, streamName, type and id", line != null && line
+            .contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line
+            .contains("\"id\""));
+      }
     }
 
     c = new WaitCondition()
@@ -285,7 +284,6 @@ public class TupleRecorderTest
       @Override
       public boolean isComplete()
       {
-        TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
         return (tupleRecorder.getTotalTupleCount() >= testTupleCount);
       }
 
@@ -306,24 +304,23 @@ public class TupleRecorderTest
     };
     Assert.assertTrue("Tuple recorder shouldn't exist any more after stopping", StramTestSupport.awaitCompletion(c, 5000));
 
-    file = new File(dir, "index.txt");
+    file = new File(dir, FSPartFileCollection.INDEX_FILE);
     Assert.assertTrue("index file should exist", file.exists());
-    br = new BufferedReader(new FileReader(file));
 
-    ArrayList<String> partFiles = new ArrayList<String>();
+    ArrayList<String> partFiles = new ArrayList<>();
     int indexCount = 0;
-    while ((line = br.readLine()) != null) {
-      String partFile = "part" + indexCount + ".txt";
-      if (line.startsWith("F:" + partFile + ":")) {
-        partFiles.add(partFile);
-        indexCount++;
-      }
-      else if (line.startsWith("E")) {
-        Assert.assertEquals("index file should end after E line", br.readLine(), null);
-        break;
-      }
-      else {
-        Assert.fail("index file line is not starting with F or E");
+    try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+      while ((line = br.readLine()) != null) {
+        String partFile = "part" + indexCount + ".txt";
+        if (line.startsWith("F:" + partFile + ":")) {
+          partFiles.add(partFile);
+          indexCount++;
+        } else if (line.startsWith("E")) {
+          Assert.assertEquals("index file should end after E line", br.readLine(), null);
+          break;
+        } else {
+          Assert.fail("index file line is not starting with F or E");
+        }
       }
     }
 
@@ -337,17 +334,16 @@ public class TupleRecorderTest
         Assert.assertTrue(partFile + " should be greater than 1KB", file.length() >= 1024);
       }
       Assert.assertTrue(partFile + " should exist", file.exists());
-      br = new BufferedReader(new FileReader(file));
-      while ((line = br.readLine()) != null) {
-        if (line.startsWith("B:")) {
-          beginWindowExists = true;
-        }
-        else if (line.startsWith("E:")) {
-          endWindowExists = true;
-        }
-        else if (line.startsWith("T:")) {
-          String[] parts = line.split(":");
-          tupleCount[Integer.valueOf(parts[2])]++;
+      try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+        while ((line = br.readLine()) != null) {
+          if (line.startsWith("B:")) {
+            beginWindowExists = true;
+          } else if (line.startsWith("E:")) {
+            endWindowExists = true;
+          } else if (line.startsWith("T:")) {
+            String[] parts = line.split(":");
+            tupleCount[Integer.valueOf(parts[2])]++;
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index c7e8ccc..2577504 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -277,6 +277,8 @@ public class GenericNodeTest
     gn.connectInputPort("ip1", reservoir1);
     gn.connectInputPort("ip2", reservoir2);
     gn.connectOutputPort("op", output);
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
 
     final AtomicBoolean ab = new AtomicBoolean(false);
     Thread t = new Thread()
@@ -382,6 +384,8 @@ public class GenericNodeTest
     gn.connectInputPort("ip1", reservoir1);
     gn.connectInputPort("ip2", reservoir2);
     gn.connectOutputPort("op", Sink.BLACKHOLE);
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
 
     final AtomicBoolean ab = new AtomicBoolean(false);
     Thread t = new Thread()
@@ -493,6 +497,8 @@ public class GenericNodeTest
 
     in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024));
     in.connectOutputPort("output", testSink);
+    in.firstWindowMillis = 0;
+    in.windowWidthMillis = 100;
 
     windowGenerator.activate(null);
 
@@ -551,9 +557,13 @@ public class GenericNodeTest
     final long sleepTime = 25L;
 
     WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
-    windowGenerator.setResetWindow(0L);
-    windowGenerator.setFirstWindow(1448909287863L);
-    windowGenerator.setWindowWidth(100);
+    long resetWindow = 0L;
+    long firstWindowMillis = 1448909287863L;
+    int windowWidth = 100;
+
+    windowGenerator.setResetWindow(resetWindow);
+    windowGenerator.setFirstWindow(firstWindowMillis);
+    windowGenerator.setWindowWidth(windowWidth);
     windowGenerator.setCheckpointCount(1, 0);
 
     GenericOperator go = new GenericOperator();
@@ -576,6 +586,8 @@ public class GenericNodeTest
 
     gn.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(gn.id), 1024));
     gn.connectOutputPort("output", testSink);
+    gn.firstWindowMillis = firstWindowMillis;
+    gn.windowWidthMillis = windowWidth;
 
     windowGenerator.activate(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
index 0c8ae62..a3b0c53 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
@@ -132,6 +132,9 @@ public class GenericTestOperator extends BaseOperator {
     if (outport1.isConnected()) {
       outport1.emit(o);
     }
+    if (outport2.isConnected()) {
+      outport2.emit(o);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
new file mode 100644
index 0000000..359da17
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.validation.ValidationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for topologies with delay operator
+ */
+public class DelayOperatorTest
+{
+  private static Lock sequential = new ReentrantLock();
+
+  @Before
+  public void setup()
+  {
+    sequential.lock();
+  }
+
+  @After
+  public void teardown()
+  {
+    sequential.unlock();
+  }
+
+  @Test
+  public void testInvalidDelayDetection()
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToD", opDelay.output, opD.inport2);
+
+    List<List<String>> invalidDelays = new ArrayList<>();
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    assertEquals("operator invalid delay", 1, invalidDelays.size());
+
+    try {
+      dag.validate();
+      fail("validation should fail");
+    } catch (ValidationException e) {
+      // expected
+    }
+
+    dag = new LogicalPlan();
+
+    opB = dag.addOperator("B", GenericTestOperator.class);
+    opC = dag.addOperator("C", GenericTestOperator.class);
+    opD = dag.addOperator("D", GenericTestOperator.class);
+    opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    dag.setAttribute(opDelay, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToC", opDelay.output, opC.inport2);
+
+    invalidDelays = new ArrayList<>();
+    dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+    assertEquals("operator invalid delay", 1, invalidDelays.size());
+
+    try {
+      dag.validate();
+      fail("validation should fail");
+    } catch (ValidationException e) {
+      // expected
+    }
+
+    dag = new LogicalPlan();
+
+    opB = dag.addOperator("B", GenericTestOperator.class);
+    opC = dag.addOperator("C", GenericTestOperator.class);
+    opD = dag.addOperator("D", GenericTestOperator.class);
+    opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input).setLocality(DAG.Locality.THREAD_LOCAL);
+    dag.addStream("DelayToC", opDelay.output, opC.inport2).setLocality(DAG.Locality.THREAD_LOCAL);
+
+    try {
+      dag.validate();
+      fail("validation should fail");
+    } catch (ValidationException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testValidDelay()
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+    GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+    GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+    GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    dag.addStream("AtoB", opA.outport, opB.inport1);
+    dag.addStream("BtoC", opB.outport1, opC.inport1);
+    dag.addStream("CtoD", opC.outport1, opD.inport1);
+    dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+    dag.addStream("DelayToB", opDelay.output, opB.inport2);
+    dag.validate();
+  }
+
+  public static final Long[] FIBONACCI_NUMBERS = new Long[]{
+      1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L, 2584L, 4181L, 6765L,
+      10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L, 1346269L, 2178309L,
+      3524578L, 5702887L, 9227465L, 14930352L, 24157817L, 39088169L, 63245986L, 102334155L
+  };
+
+  public static class FibonacciOperator extends BaseOperator
+  {
+    public static List<Long> results = new ArrayList<>();
+    public long currentNumber = 1;
+    private transient long tempNum;
+
+    public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+      }
+    };
+    public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
+    {
+      @Override
+      public void process(Long tuple)
+      {
+        tempNum = tuple;
+      }
+    };
+    public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+    @Override
+    public void endWindow()
+    {
+      output.emit(currentNumber);
+      results.add(currentNumber);
+      currentNumber += tempNum;
+      if (currentNumber <= 0) {
+        // overflow
+        currentNumber = 1;
+      }
+    }
+
+  }
+
+  public static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener
+  {
+    private boolean committed = false;
+    private int simulateFailureWindows = 0;
+    private boolean simulateFailureAfterCommit = false;
+    private int windowCount = 0;
+    public static boolean failureSimulated = false;
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
+          !failureSimulated) {
+        if (windowCount++ == simulateFailureWindows) {
+          failureSimulated = true;
+          throw new RuntimeException("simulating failure");
+        }
+      }
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+      committed = true;
+    }
+
+    public void setSimulateFailureWindows(int windows, boolean afterCommit)
+    {
+      this.simulateFailureAfterCommit = afterCommit;
+      this.simulateFailureWindows = windows;
+    }
+  }
+
+  public static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener
+  {
+    private boolean committed = false;
+    private int simulateFailureWindows = 0;
+    private boolean simulateFailureAfterCommit = false;
+    private int windowCount = 0;
+    private static boolean failureSimulated = false;
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
+          !failureSimulated) {
+        if (windowCount++ == simulateFailureWindows) {
+          failureSimulated = true;
+          throw new RuntimeException("simulating failure");
+        }
+      }
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+      committed = true;
+    }
+
+    public void setSimulateFailureWindows(int windows, boolean afterCommit)
+    {
+      this.simulateFailureAfterCommit = afterCommit;
+      this.simulateFailureWindows = windows;
+    }
+  }
+
+
+  @Test
+  public void testFibonacci() throws Exception
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+    FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+    dag.addStream("operator_to_delay", fib.output, opDelay.input);
+    dag.addStream("delay_to_operator", opDelay.output, fib.input);
+    FibonacciOperator.results.clear();
+    final StramLocalCluster localCluster = new StramLocalCluster(dag);
+    localCluster.setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return FibonacciOperator.results.size() >= 10;
+      }
+    });
+    localCluster.run(10000);
+    Assert.assertArrayEquals(Arrays.copyOfRange(FIBONACCI_NUMBERS, 0, 10),
+        FibonacciOperator.results.subList(0, 10).toArray());
+  }
+
+  @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
+  @Test
+  public void testFibonacciRecovery1() throws Exception
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+    FailableFibonacciOperator fib = dag.addOperator("FIB", FailableFibonacciOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+    fib.setSimulateFailureWindows(3, true);
+
+    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+    dag.addStream("operator_to_delay", fib.output, opDelay.input);
+    dag.addStream("delay_to_operator", opDelay.output, fib.input);
+    dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
+    dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+    FailableFibonacciOperator.results.clear();
+    FailableFibonacciOperator.failureSimulated = false;
+    final StramLocalCluster localCluster = new StramLocalCluster(dag);
+    localCluster.setPerContainerBufferServer(true);
+    localCluster.setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return FailableFibonacciOperator.results.size() >= 30;
+      }
+    });
+    localCluster.run(60000);
+    Assert.assertTrue("failure should be invoked", FailableFibonacciOperator.failureSimulated);
+    Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
+        Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+  }
+
+  @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
+  @Test
+  public void testFibonacciRecovery2() throws Exception
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+    FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+    FailableDelayOperator opDelay = dag.addOperator("opDelay", FailableDelayOperator.class);
+
+    opDelay.setSimulateFailureWindows(5, true);
+
+    dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+    dag.addStream("operator_to_delay", fib.output, opDelay.input);
+    dag.addStream("delay_to_operator", opDelay.output, fib.input);
+    dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
+    dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+    FibonacciOperator.results.clear();
+    FailableDelayOperator.failureSimulated = false;
+    final StramLocalCluster localCluster = new StramLocalCluster(dag);
+    localCluster.setPerContainerBufferServer(true);
+    localCluster.setExitCondition(new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return FibonacciOperator.results.size() >= 30;
+      }
+    });
+    localCluster.run(60000);
+
+    Assert.assertTrue("failure should be invoked", FailableDelayOperator.failureSimulated);
+    Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
+        Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+  }
+
+
+}


Mime
View raw message