apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tus...@apache.org
Subject apex-core git commit: APEXCORE-580 APEXCORE-581 Support for custom control tuples
Date Fri, 03 Mar 2017 06:41:48 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 3b660c9c1 -> 1e4785671


APEXCORE-580 APEXCORE-581 Support for custom control tuples


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/1e478567
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/1e478567
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/1e478567

Branch: refs/heads/master
Commit: 1e47856712dc4fcae40856d27ce8ce2360037a12
Parents: 3b660c9
Author: bhupeshchawda <bhupesh@apache.org>
Authored: Wed Dec 28 15:48:42 2016 +0530
Committer: bhupeshchawda <bhupesh@apache.org>
Committed: Fri Mar 3 11:08:25 2017 +0530

----------------------------------------------------------------------
 .../api/ControlTupleEnabledSink.java            |  56 +++
 .../com/datatorrent/api/DefaultInputPort.java   |   2 +-
 .../com/datatorrent/api/DefaultOutputPort.java  |  21 +-
 .../apex/api/ControlAwareDefaultInputPort.java  |  46 +++
 .../apex/api/ControlAwareDefaultOutputPort.java |  60 +++
 .../apex/api/UserDefinedControlTuple.java       |  46 +++
 .../bufferserver/packet/CustomControlTuple.java |  36 ++
 .../bufferserver/packet/MessageType.java        |   4 +
 .../datatorrent/bufferserver/packet/Tuple.java  |   3 +
 .../datatorrent/stram/engine/GenericNode.java   | 126 ++++++-
 .../com/datatorrent/stram/engine/OiONode.java   |  53 +++
 .../com/datatorrent/stram/engine/Stream.java    |   3 +-
 .../datatorrent/stram/engine/UnifierNode.java   |   1 -
 .../stram/engine/WindowGenerator.java           |   8 +
 .../stram/stream/BufferServerPublisher.java     |  32 ++
 .../stram/stream/BufferServerSubscriber.java    |  12 +
 .../datatorrent/stram/stream/FastPublisher.java |  15 +
 .../datatorrent/stram/stream/InlineStream.java  |  10 +
 .../com/datatorrent/stram/stream/MuxStream.java |  11 +-
 .../com/datatorrent/stram/stream/OiOStream.java |  15 +
 .../stram/tuple/CustomControlTuple.java         |  62 +++
 .../stram/CustomControlTupleTest.java           | 376 +++++++++++++++++++
 .../stram/engine/GenericNodeTest.java           | 192 ++++++++++
 23 files changed, 1179 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
new file mode 100644
index 0000000..e27003d
--- /dev/null
+++ b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.api;
+
+import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A {@link Sink} which supports adding control tuples
+ */
+@InterfaceStability.Evolving
+public interface ControlTupleEnabledSink<T> extends Sink<T>
+{
+  public static final ControlTupleEnabledSink<Object> BLACKHOLE = new ControlTupleEnabledSink<Object>()
+  {
+    @Override
+    public void put(Object tuple)
+    {
+    }
+
+    @Override
+    public boolean putControl(UserDefinedControlTuple payload)
+    {
+      return true;
+    }
+
+    @Override
+    public int getCount(boolean reset)
+    {
+      return 0;
+    }
+  };
+
+  /**
+   * Add a control tuple to the sink
+   *
+   * @param payload the control tuple payload
+   */
+  public boolean putControl(UserDefinedControlTuple payload);
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
index 046a35d..dc8705c 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
@@ -31,7 +31,7 @@ import com.datatorrent.api.Operator.InputPort;
  */
 public abstract class DefaultInputPort<T> implements InputPort<T>, Sink<T>
 {
-  private int count;
+  protected int count;
   protected boolean connected = false;
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
index 71be22c..acd562f 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
@@ -37,7 +37,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
   public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable";
   private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class);
 
-  private transient Sink<Object> sink;
+  protected transient Sink<Object> sink;
   private transient Thread operatorThread;
 
   /**
@@ -45,7 +45,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
    */
   public DefaultOutputPort()
   {
-    this.sink = Sink.BLACKHOLE;
+    this.sink = ControlTupleEnabledSink.BLACKHOLE;
   }
 
   /**
@@ -55,13 +55,18 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
    */
   public void emit(T tuple)
   {
+    verifyOperatorThread();
+    sink.put(tuple);
+  }
+
+  protected void verifyOperatorThread()
+  {
     // operatorThread could be null if setup() never got called.
     if (operatorThread != null && Thread.currentThread() != operatorThread) {
       // only under certain modes: enforce this
       throw new IllegalStateException("Current thread " + Thread.currentThread().getName() +
-          " is different from the operator thread " + operatorThread.getName());
+        " is different from the operator thread " + operatorThread.getName());
     }
-    sink.put(tuple);
   }
 
   /**
@@ -70,7 +75,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
    * Called by execution engine to inject sink at deployment time.
    */
   @Override
-  public final void setSink(Sink<Object> s)
+  public void setSink(Sink<Object> s)
   {
     this.sink = s == null ? Sink.BLACKHOLE : s;
   }
@@ -83,7 +88,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
    */
   public boolean isConnected()
   {
-    return sink != Sink.BLACKHOLE;
+    return sink != ControlTupleEnabledSink.BLACKHOLE;
   }
 
   /**
@@ -113,4 +118,8 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
   {
   }
 
+  protected Sink<Object> getSink()
+  {
+    return sink;
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
new file mode 100644
index 0000000..ff2b849
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.ControlTupleEnabledSink;
+import com.datatorrent.api.DefaultInputPort;
+
+/**
+ * Default abstract implementation for an input port which is capable of processing
+ * @{@link UserDefinedControlTuple}
+ */
+@InterfaceStability.Evolving
+public abstract class ControlAwareDefaultInputPort<T> extends DefaultInputPort<T> implements ControlTupleEnabledSink<T>
+{
+  @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    count++;
+    return processControl(payload);
+  }
+
+  /**
+   * Process the control tuples
+   *
+   * @param payload the control tuple payload generated by upstream operator(s)
+   */
+  public abstract boolean processControl(UserDefinedControlTuple payload);
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
new file mode 100644
index 0000000..4a83518
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.ControlTupleEnabledSink;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Sink;
+
+/**
+ * Default implementation for an output port which can emit a @{@link UserDefinedControlTuple}.
+ * The {@link #emitControl(UserDefinedControlTuple)} method can be used to emit control tuples onto this output port
+ */
+@InterfaceStability.Evolving
+public class ControlAwareDefaultOutputPort<T> extends DefaultOutputPort<T>
+{
+  public ControlAwareDefaultOutputPort()
+  {
+    sink = ControlTupleEnabledSink.BLACKHOLE;
+  }
+
+  /**
+   * Allows the operator to emit a @{@link UserDefinedControlTuple}
+   * @param {@link UserDefinedControlTuple}
+   */
+  public void emitControl(UserDefinedControlTuple tuple)
+  {
+    verifyOperatorThread();
+    ((ControlTupleEnabledSink)sink).putControl(tuple);
+  }
+
+  public boolean isConnected()
+  {
+    return sink != ControlTupleEnabledSink.BLACKHOLE;
+  }
+
+  @Override
+  public void setSink(Sink<Object> s)
+  {
+    this.sink = (s == null ? ControlTupleEnabledSink.BLACKHOLE : s);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
new file mode 100644
index 0000000..8e62a8f
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Any user generated control tuple must implement {@link UserDefinedControlTuple} interface
+ */
+@InterfaceStability.Evolving
+public interface UserDefinedControlTuple
+{
+  /**
+   * A user generated control tuple must specify a @{@link DeliveryType}
+   * @return @{@link DeliveryType} type
+   */
+  DeliveryType getDeliveryType();
+
+  /**
+   * All custom control tuples can be delivered according to the following semantics
+   * 1. IMMEDIATE - The control tuple will be delivered immediately to the next operator
+   * 2. END_WINDOW - The control tuple will be delivered to the next operator just before the
+   * com.datatorrent.api.Operator#endWindow() call.
+   */
+  enum DeliveryType
+  {
+    IMMEDIATE,
+    END_WINDOW
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java
new file mode 100644
index 0000000..3aca31d
--- /dev/null
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.bufferserver.packet;
+
+/**
+ * Custom Control Tuple class
+ */
+public class CustomControlTuple extends DataTuple
+{
+  public CustomControlTuple(byte[] array, int offset, int index)
+  {
+    super(array, offset, index);
+  }
+
+  @Override
+  public MessageType getType()
+  {
+    return MessageType.CUSTOM_CONTROL;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
index 3c0ec2c..efc4ac3 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
@@ -37,6 +37,7 @@ public enum MessageType
   RESET_REQUEST(9),
   CHECKPOINT(10),
   CODEC_STATE(11),
+  CUSTOM_CONTROL(12),
   NO_MESSAGE_ODD(127);
 
   public static final byte NO_MESSAGE_VALUE = 0;
@@ -51,6 +52,7 @@ public enum MessageType
   public static final byte RESET_REQUEST_VALUE = 9;
   public static final byte CHECKPOINT_VALUE = 10;
   public static final byte CODEC_STATE_VALUE = 11;
+  public static final byte CUSTOM_CONTROL_VALUE = 12;
   public static final byte NO_MESSAGE_ODD_VALUE = 127;
 
   public final int getNumber()
@@ -85,6 +87,8 @@ public enum MessageType
         return CHECKPOINT;
       case 11:
         return CODEC_STATE;
+      case 12:
+        return CUSTOM_CONTROL;
       case 127:
         return NO_MESSAGE_ODD;
       default:

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
index de3cae8..aae7f68 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
@@ -73,6 +73,9 @@ public abstract class Tuple
       case END_WINDOW:
         return new EndWindowTuple(buffer, offset, length);
 
+      case CUSTOM_CONTROL:
+        return new CustomControlTuple(buffer, offset, length);
+
       case END_STREAM:
         return new WindowIdTuple(buffer, offset, length);
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 41acd43..dae838d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -30,10 +31,14 @@ import java.util.Map.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.ControlAwareDefaultInputPort;
+import org.apache.apex.api.UserDefinedControlTuple;
 import org.apache.commons.lang.UnhandledException;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 
+import com.datatorrent.api.ControlTupleEnabledSink;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.api.Operator.InputPort;
@@ -47,6 +52,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerSt
 import com.datatorrent.stram.debug.TappedReservoir;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.Operators;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.ResetWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
@@ -67,6 +73,7 @@ public class GenericNode extends Node<Operator>
 {
   protected final HashMap<String, SweepableReservoir> inputs = new HashMap<>();
   protected ArrayList<DeferredInputConnection> deferredInputConnections = new ArrayList<>();
+  protected Map<SweepableReservoir,Sink> reservoirPortMap = Maps.newHashMap();
 
   @Override
   @SuppressWarnings("unchecked")
@@ -249,6 +256,9 @@ public class GenericNode extends Node<Operator>
 
     TupleTracker tracker;
     LinkedList<TupleTracker> resetTupleTracker = new LinkedList<>();
+    Map<SweepableReservoir, LinkedHashSet<CustomControlTuple>> immediateDeliveryTuples = Maps.newHashMap();
+    Map<SweepableReservoir,LinkedHashSet<CustomControlTuple>> endWindowDeliveryTuples = Maps.newHashMap();
+
     try {
       do {
         Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator();
@@ -290,8 +300,8 @@ public class GenericNode extends Node<Operator>
                       for (int s = sinks.length; s-- > 0; ) {
                         sinks[s].put(resetWindowTuple);
                       }
-                      controlTupleCount++;
                     }
+                    controlTupleCount++;
                     t.setWindowId(windowAhead);
                   }
                   for (int s = sinks.length; s-- > 0; ) {
@@ -354,6 +364,36 @@ public class GenericNode extends Node<Operator>
                     if (delay) {
                       t.setWindowId(windowAhead);
                     }
+
+                    /* Emit control tuples here */
+                    if (reservoirPortMap.isEmpty()) {
+                      populateReservoirInputPortMap();
+                    }
+
+
+                    for (Entry<SweepableReservoir,LinkedHashSet<CustomControlTuple>> portSet: endWindowDeliveryTuples.entrySet()) {
+                      Sink activeSink = reservoirPortMap.get(portSet.getKey());
+                      // activeSink may not be null
+                      if (activeSink instanceof ControlAwareDefaultInputPort) {
+                        ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink;
+                        for (CustomControlTuple cct : portSet.getValue()) {
+                          if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) {
+                            // operator cannot handle control tuple; forward to sinks
+                            forwardToSinks(delay, cct);
+                          }
+                        }
+                      } else {
+                        // Not a ControlAwarePort. Operator cannot handle a custom control tuple.
+                        for (CustomControlTuple cct : portSet.getValue()) {
+                          forwardToSinks(delay, cct);
+                        }
+                      }
+                    }
+
+                    immediateDeliveryTuples.clear();
+                    endWindowDeliveryTuples.clear();
+
+                    /* Now call endWindow() */
                     processEndWindow(t);
                     activeQueues.addAll(inputs.entrySet());
                     expectingBeginWindow = activeQueues.size();
@@ -362,6 +402,53 @@ public class GenericNode extends Node<Operator>
                 }
                 break;
 
+              case CUSTOM_CONTROL:
+                activePort.remove();
+                /* All custom control tuples are expected to be arriving in the current window only.*/
+                /* Buffer control tuples until end of the window */
+                CustomControlTuple cct = (CustomControlTuple)t;
+                UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject();
+                boolean forward = false;
+
+                // Handle Immediate Delivery Control Tuples
+                if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) {
+                  if (!isDuplicate(immediateDeliveryTuples.get(activePort), cct)) {
+                    // Forward immediately
+                    if (reservoirPortMap.isEmpty()) {
+                      populateReservoirInputPortMap();
+                    }
+
+                    Sink activeSink = reservoirPortMap.get(activePort);
+                    // activeSink may not be null
+                    if (activeSink instanceof ControlAwareDefaultInputPort) {
+                      ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink;
+                      if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) {
+                        forward = true;
+                      }
+                    } else {
+                      forward = true;
+                    }
+
+                    if (forward) {
+                      forwardToSinks(delay, cct);
+                    }
+                    // Add to set
+                    if (!immediateDeliveryTuples.containsKey(activePort)) {
+                      immediateDeliveryTuples.put(activePort, new LinkedHashSet<CustomControlTuple>());
+                    }
+                    immediateDeliveryTuples.get(activePort).add(cct);
+                  }
+                } else {
+                  // Buffer EndWindow Delivery Control Tuples
+                  if (!endWindowDeliveryTuples.containsKey(activePort)) {
+                    endWindowDeliveryTuples.put(activePort, new LinkedHashSet<CustomControlTuple>());
+                  }
+                  if (!isDuplicate(endWindowDeliveryTuples.get(activePort), cct)) {
+                    endWindowDeliveryTuples.get(activePort).add(cct);
+                  }
+                }
+                break;
+
               case CHECKPOINT:
                 activePort.remove();
                 long checkpointWindow = t.getWindowId();
@@ -656,6 +743,43 @@ public class GenericNode extends Node<Operator>
 
   }
 
+  protected void forwardToSinks(boolean delay, Object o)
+  {
+    if (!delay) {
+      for (int s = sinks.length; s-- > 0; ) {
+        sinks[s].put(o);
+      }
+      controlTupleCount++;
+    }
+  }
+
+  /**
+   * Populate {@link #reservoirPortMap} with information on which reservoirs are connected to which input ports
+   */
+  protected void populateReservoirInputPortMap()
+  {
+    for (Entry<String,Operators.PortContextPair<InputPort<?>>> entry : descriptor.inputPorts.entrySet()) {
+      if (entry.getValue().component != null && entry.getValue().component instanceof InputPort) {
+        if (inputs.containsKey(entry.getKey())) {
+          reservoirPortMap.put(inputs.get(entry.getKey()), entry.getValue().component.getSink());
+        }
+      }
+    }
+  }
+
+  protected boolean isDuplicate(LinkedHashSet<CustomControlTuple> set, CustomControlTuple t)
+  {
+    if (set == null || set.isEmpty()) {
+      return false;
+    }
+    for (CustomControlTuple cct : set) {
+      if (cct.getUid().equals(t.getUid())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead)
   {
     Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
index f968b4e..e2370ff 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
@@ -19,16 +19,24 @@
 package com.datatorrent.stram.engine;
 
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.Map.Entry;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.ControlAwareDefaultInputPort;
+import org.apache.apex.api.UserDefinedControlTuple;
 import org.apache.commons.lang.UnhandledException;
 
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Sink;
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
+import com.datatorrent.stram.stream.OiOStream;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 /**
@@ -59,6 +67,9 @@ public class OiONode extends GenericNode
       reservoir = sr;
     }
 
+    private LinkedHashSet<CustomControlTuple> immediateDeliveryControlTuples = Sets.newLinkedHashSet();
+    private LinkedHashSet<CustomControlTuple> endWindowControlTuples = Sets.newLinkedHashSet();
+
     @Override
     public void put(Tuple t)
     {
@@ -82,10 +93,52 @@ public class OiONode extends GenericNode
         case END_WINDOW:
           endWindowDequeueTimes.put(reservoir, System.currentTimeMillis());
           if (--expectingEndWindows == 0) {
+
+            /* process custom control tuples here */
+            for (CustomControlTuple cct : endWindowControlTuples) {
+              Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink();
+              if (sink instanceof ControlAwareDefaultInputPort) {
+                if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) {
+                  // Operator will not handle; forward to sinks
+                  forwardToSinks(false, cct);
+                }
+              } else {
+                // Port incapable of handling; forward to sinks
+                forwardToSinks(false, cct);
+              }
+            }
+            endWindowControlTuples.clear();
+            immediateDeliveryControlTuples.clear();
+
             processEndWindow(t);
           }
           break;
 
+        case CUSTOM_CONTROL:
+          CustomControlTuple cct = ((CustomControlTuple)t);
+          UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject();
+
+          if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) { // Immediate Delivery
+            if (!isDuplicate(immediateDeliveryControlTuples, cct)) {
+              Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink();
+              if (sink instanceof ControlAwareDefaultInputPort) {
+                if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) {
+                  // Operator will not handle; forward to sinks
+                  forwardToSinks(false, cct);
+                }
+              } else {
+                forwardToSinks(false, cct);
+              }
+              // store
+              immediateDeliveryControlTuples.add(cct);
+            }
+          } else { // End Window Delivery
+            if (!isDuplicate(endWindowControlTuples, cct)) {
+              endWindowControlTuples.add(cct);
+            }
+          }
+          break;
+
         case CHECKPOINT:
           dagCheckpointOffsetCount = 0;
           if (lastCheckpointWindowId < t.getWindowId() && !doCheckpoint) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
index fc93b38..196134f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
@@ -19,6 +19,7 @@
 package com.datatorrent.stram.engine;
 
 import com.datatorrent.api.Component;
+import com.datatorrent.api.ControlTupleEnabledSink;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.Sink;
 
@@ -32,7 +33,7 @@ import com.datatorrent.api.Sink;
 /*
  * Provides basic interface for a stream object. Stram, StramChild work via this interface
  */
-public interface Stream extends Component<StreamContext>, ActivationListener<StreamContext>, Sink<Object>
+public interface Stream extends Component<StreamContext>, ActivationListener<StreamContext>, ControlTupleEnabledSink<Object>
 {
   public interface MultiSinkCapableStream extends Stream
   {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java b/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
index e38c94e..57a20b7 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
@@ -27,7 +27,6 @@ import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.Unifier;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.StreamCodec;
-
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 
 /**

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 3a8438d..77ce1f0 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.UserDefinedControlTuple;
+
 import com.datatorrent.bufferserver.packet.MessageType;
 import com.datatorrent.common.util.ScheduledExecutorService;
 import com.datatorrent.netlet.util.CircularBuffer;
@@ -228,6 +230,12 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
   }
 
   @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
   protected Queue getQueue()
   {
     return queue;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
index 7db4892..fa2d823 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.UserDefinedControlTuple;
+
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.bufferserver.client.Publisher;
 import com.datatorrent.bufferserver.packet.BeginWindowTuple;
@@ -40,6 +42,7 @@ import com.datatorrent.stram.codec.StatefulStreamCodec;
 import com.datatorrent.stram.codec.StatefulStreamCodec.DataStatePair;
 import com.datatorrent.stram.engine.ByteCounterStream;
 import com.datatorrent.stram.engine.StreamContext;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 import static java.lang.Thread.sleep;
@@ -98,6 +101,28 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
           array = EndWindowTuple.getSerializedTuple((int)t.getWindowId());
           break;
 
+        case CUSTOM_CONTROL:
+          if (statefulSerde == null) {
+            array = com.datatorrent.bufferserver.packet.CustomControlTuple
+                .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, serde.toByteArray(payload));
+          } else {
+            DataStatePair dsp = statefulSerde.toDataStatePair(payload);
+            if (dsp.state != null) {
+              array = com.datatorrent.bufferserver.packet.CustomControlTuple
+                  .getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state);
+              try {
+                while (!write(array)) {
+                  sleep(5);
+                }
+              } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+              }
+            }
+            array = com.datatorrent.bufferserver.packet.CustomControlTuple
+                .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, dsp.data);
+          }
+          break;
+
         case END_STREAM:
           array = EndStreamTuple.getSerializedTuple((int)t.getWindowId());
           break;
@@ -145,6 +170,13 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
     }
   }
 
+  @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    put(new CustomControlTuple(payload));
+    return false;
+  }
+
   /**
    *
    * @param context

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index d5b0997..606add0 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.UserDefinedControlTuple;
+
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.bufferserver.client.Subscriber;
@@ -196,6 +198,12 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
     throw new UnsupportedOperationException("Not supported yet.");
   }
 
+  @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
   public SweepableReservoir releaseReservoir(String sinkId)
   {
     BufferReservoir r = reservoirMap.remove(sinkId);
@@ -343,6 +351,10 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
               o = new EndWindowTuple(baseSeconds | (lastWindowId = data.getWindowId()));
               break;
 
+            case CUSTOM_CONTROL:
+              o = processPayload(data);
+              break;
+
             case END_STREAM:
               o = new EndStreamTuple(baseSeconds | data.getWindowId());
               break;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 88f2052..574e61c 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -28,6 +28,8 @@ import java.nio.channels.SocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.UserDefinedControlTuple;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Output;
@@ -44,6 +46,7 @@ import com.datatorrent.netlet.Listener;
 import com.datatorrent.netlet.Listener.ClientListener;
 import com.datatorrent.stram.engine.Stream;
 import com.datatorrent.stram.engine.StreamContext;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 import static java.lang.Thread.sleep;
@@ -228,6 +231,11 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
           array = EndWindowTuple.getSerializedTuple((int)t.getWindowId());
           break;
 
+        case CUSTOM_CONTROL:
+          array = null;
+          // TODO implement
+          break;
+
         case END_STREAM:
           array = EndStreamTuple.getSerializedTuple((int)t.getWindowId());
           break;
@@ -477,6 +485,13 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
     }
   }
 
+  @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    put(new CustomControlTuple(payload));
+    return true;
+  }
+
   @SuppressWarnings("SleepWhileInLoop")
   public void advanceWriteBuffer()
   {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
index ec9660b..7559a18 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
@@ -21,10 +21,13 @@ package com.datatorrent.stram.stream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.UserDefinedControlTuple;
+
 import com.datatorrent.stram.engine.AbstractReservoir;
 import com.datatorrent.stram.engine.Stream;
 import com.datatorrent.stram.engine.StreamContext;
 import com.datatorrent.stram.engine.SweepableReservoir;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 /**
@@ -99,6 +102,13 @@ public class InlineStream implements Stream
   }
 
   @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    put(new CustomControlTuple(payload));
+    return false;
+  }
+
+  @Override
   public int getCount(boolean reset)
   {
     try {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
index 63f5ee4..007a3ac 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
@@ -24,10 +24,12 @@ import java.util.HashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Sink;
+import org.apache.apex.api.UserDefinedControlTuple;
 
+import com.datatorrent.api.Sink;
 import com.datatorrent.stram.engine.Stream;
 import com.datatorrent.stram.engine.StreamContext;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 
 /**
  * <p>MuxStream class.</p>
@@ -121,6 +123,13 @@ public class MuxStream implements Stream.MultiSinkCapableStream
   }
 
   @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    put(new CustomControlTuple(payload));
+    return false;
+  }
+
+  @Override
   public int getCount(boolean reset)
   {
     try {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
index 61ed0e6..f4a2b9b 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
@@ -18,10 +18,13 @@
  */
 package com.datatorrent.stram.stream;
 
+import org.apache.apex.api.UserDefinedControlTuple;
+
 import com.datatorrent.api.Sink;
 import com.datatorrent.stram.engine.Stream;
 import com.datatorrent.stram.engine.StreamContext;
 import com.datatorrent.stram.engine.SweepableReservoir;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.Tuple;
 
 /**
@@ -77,6 +80,13 @@ public class OiOStream implements Stream
   }
 
   @Override
+  public boolean putControl(UserDefinedControlTuple payload)
+  {
+    put(new CustomControlTuple(payload));
+    return false;
+  }
+
+  @Override
   public int getCount(boolean reset)
   {
     try {
@@ -124,6 +134,11 @@ public class OiOStream implements Stream
       }
     }
 
+    public Sink<Object> getSink()
+    {
+      return OiOStream.this.sink;
+    }
+
     @Override
     public Tuple sweep()
     {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java b/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java
new file mode 100644
index 0000000..810fa57
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.tuple;
+
+import java.util.UUID;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import com.datatorrent.bufferserver.packet.MessageType;
+
+/**
+ * An implementation for @{@link Tuple} which can be generated by the user
+ * Acts as the wrapper for the user payload
+ */
+public class CustomControlTuple extends Tuple
+{
+  private final Object userObject;
+  @FieldSerializer.Bind(JavaSerializer.class)
+  private final UUID uid;
+
+  protected CustomControlTuple()
+  {
+    // for Kryo
+    super(MessageType.CUSTOM_CONTROL, 0);
+    userObject = null;
+    uid = null;
+  }
+
+  public CustomControlTuple(Object userObject)
+  {
+    super(MessageType.CUSTOM_CONTROL, 0);
+    this.userObject = userObject;
+    uid = UUID.randomUUID();
+  }
+
+  public Object getUserObject()
+  {
+    return userObject;
+  }
+
+  public UUID getUid()
+  {
+    return uid;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
new file mode 100644
index 0000000..86078c2
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram;
+
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.ControlAwareDefaultInputPort;
+import org.apache.apex.api.ControlAwareDefaultOutputPort;
+import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+
+public class CustomControlTupleTest
+{
+  public static final Logger LOG = LoggerFactory.getLogger(CustomControlTupleTest.class);
+  private static long controlIndex = 0;
+  private static int numControlTuples = 0;
+  private static boolean done = false;
+  private static boolean endApp = false;
+  private static long endingWindowId = 0;
+  private static boolean immediate = false;
+
+  @Before
+  public void starting()
+  {
+    controlIndex = 0;
+    numControlTuples = 0;
+    done = false;
+    endApp = false;
+    endingWindowId = 0;
+  }
+
+  public static class Generator extends BaseOperator implements InputOperator
+  {
+    private long currentWindowId;
+    public final transient ControlAwareDefaultOutputPort<Double> out = new ControlAwareDefaultOutputPort<>();
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      if (!done) {
+        currentWindowId = windowId;
+        out.emitControl(new TestControlTuple(controlIndex++, immediate));
+      }
+    }
+
+    @Override
+    public void emitTuples()
+    {
+      if (!done) {
+        out.emitControl(new TestControlTuple(controlIndex++, immediate));
+      }
+    }
+
+    @Override
+    public void endWindow()
+    {
+      if (!done) {
+        out.emitControl(new TestControlTuple(controlIndex++, immediate));
+        endingWindowId = currentWindowId;
+        done = true;
+      }
+    }
+  }
+
+  public static class DefaultProcessor extends BaseOperator
+  {
+    public final transient DefaultInputPort<Double> input = new DefaultInputPort<Double>()
+    {
+      @Override
+      public void process(Double tuple)
+      {
+        output.emit(tuple);
+      }
+    };
+
+    public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<>();
+  }
+
+  public static class ControlAwareProcessor extends BaseOperator
+  {
+    public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>()
+    {
+      @Override
+      public void process(Double tuple)
+      {
+        output.emit(tuple);
+      }
+
+      @Override
+      public boolean processControl(UserDefinedControlTuple tuple)
+      {
+        output.emitControl(tuple);
+        return true;
+      }
+    };
+
+    public final transient ControlAwareDefaultOutputPort<Double> output = new ControlAwareDefaultOutputPort<>();
+  }
+
+  public static class ControlAwareReceiver extends BaseOperator
+  {
+    private long currentWindowId;
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      currentWindowId = windowId;
+    }
+
+    public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>()
+    {
+      @Override
+      public boolean processControl(UserDefinedControlTuple payload)
+      {
+        numControlTuples++;
+        return false;
+      }
+
+      @Override
+      public void process(Double tuple)
+      {
+      }
+    };
+
+    @Override
+    public void endWindow()
+    {
+      if (done && currentWindowId > endingWindowId) {
+        endApp = true;
+      }
+    }
+  }
+
+  @ApplicationAnnotation(name = "TestDefaultPropagation")
+  public static class Application1 implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+      DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+      ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+      dag.addStream("genToProcessor", randomGenerator.out, processor.input);
+      dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
+    }
+  }
+
+  @ApplicationAnnotation(name = "TestExplicitPropagation")
+  public static class Application2 implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+      ControlAwareProcessor processor = dag.addOperator("process", ControlAwareProcessor.class);
+      ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+      dag.addStream("genToProcessor", randomGenerator.out, processor.input);
+      dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
+    }
+  }
+
+  @ApplicationAnnotation(name = "TestDuplicateControlTuples")
+  public static class Application3 implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+      DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+      ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+      dag.addStream("genToProcessor", randomGenerator.out, processor.input);
+      dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
+      dag.setOperatorAttribute(processor, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
+    }
+  }
+
+  @ApplicationAnnotation(name = "TestThreadLocal")
+  public static class Application4 implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+      DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+      ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+      dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.THREAD_LOCAL);
+      dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.THREAD_LOCAL);
+    }
+  }
+
+  @ApplicationAnnotation(name = "TestContainerLocal")
+  public static class Application5 implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+      DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+      ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+      dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+      dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+    }
+  }
+
+  public void testApp(StreamingApplication app) throws Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(app, conf);
+      LocalMode.Controller lc = lma.getController();
+      ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+      {
+        @Override
+        public Boolean call() throws Exception
+        {
+          return endApp;
+        }
+      });
+
+      lc.run(200000); // runs for 20 seconds and quits if terminating condition not reached
+
+      LOG.info("Control Tuples received {} expected {}", numControlTuples, controlIndex);
+      Assert.assertTrue("Incorrect Control Tuples", numControlTuples == controlIndex);
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  @Test
+  public void testDefaultPropagation() throws Exception
+  {
+    immediate = false;
+    testApp(new Application1());
+  }
+
+  @Test
+  public void testExplicitPropagation() throws Exception
+  {
+    immediate = false;
+    testApp(new Application2());
+  }
+
+  @Test
+  public void testDuplicateControlTuples() throws Exception
+  {
+    immediate = false;
+    testApp(new Application3());
+  }
+
+  @Test
+  public void testThreadLocal() throws Exception
+  {
+    immediate = false;
+    testApp(new Application4());
+  }
+
+  @Test
+  public void testContainerLocal() throws Exception
+  {
+    immediate = false;
+    testApp(new Application5());
+  }
+
+  @Test
+  public void testDefaultPropagationImmediate() throws Exception
+  {
+    immediate = true;
+    testApp(new Application1());
+  }
+
+  @Test
+  public void testExplicitPropagationImmediate() throws Exception
+  {
+    immediate = true;
+    testApp(new Application2());
+  }
+
+  @Test
+  public void testDuplicateControlTuplesImmediate() throws Exception
+  {
+    immediate = true;
+    testApp(new Application3());
+  }
+
+  @Test
+  public void testThreadLocalImmediate() throws Exception
+  {
+    immediate = true;
+    testApp(new Application4());
+  }
+
+  @Test
+  public void testContainerLocalImmediate() throws Exception
+  {
+    immediate = true;
+    testApp(new Application5());
+  }
+
+  public static class TestControlTuple implements UserDefinedControlTuple
+  {
+    public long data;
+    public boolean immediate;
+
+    public TestControlTuple()
+    {
+      data = 0;
+    }
+
+    public TestControlTuple(long data, boolean immediate)
+    {
+      this.data = data;
+      this.immediate = immediate;
+    }
+
+    @Override
+    public boolean equals(Object t)
+    {
+      if (t instanceof TestControlTuple && ((TestControlTuple)t).data == this.data) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public String toString()
+    {
+      return data + "";
+    }
+
+    @Override
+    public DeliveryType getDeliveryType()
+    {
+      if (immediate) {
+        return DeliveryType.IMMEDIATE;
+      } else {
+        return DeliveryType.END_WINDOW;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index af99e98..99dee8f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -62,10 +62,13 @@ import com.datatorrent.common.util.ScheduledExecutorService;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.stram.CustomControlTupleTest;
 import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
 import com.datatorrent.stram.stream.BufferServerPublisher;
 import com.datatorrent.stram.stream.BufferServerSubscriber;
+import com.datatorrent.stram.stream.OiOStream;
+import com.datatorrent.stram.tuple.CustomControlTuple;
 import com.datatorrent.stram.tuple.EndStreamTuple;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -609,6 +612,195 @@ public class GenericNodeTest
   }
 
   @Test
+  public void testControlTuplesDeliveryGenericNode() throws InterruptedException
+  {
+    long maxSleep = 5000000;
+    long sleeptime = 25L;
+    GenericOperator go = new GenericOperator();
+    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+        new DefaultAttributeMap(), null));
+    gn.setId(1);
+    AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
+
+    gn.connectInputPort("ip1", reservoir1);
+    TestSink testSink = new TestSink();
+    gn.connectOutputPort("op", testSink);
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        ab.set(true);
+        gn.activate();
+        gn.run();
+        gn.deactivate();
+      }
+    };
+    t.start();
+
+    long interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    } while ((ab.get() == false) && (interval < maxSleep));
+
+    int controlTupleCount = gn.controlTupleCount;
+    Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
+    reservoir1.add(beginWindow);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+    controlTupleCount = gn.controlTupleCount;
+
+    CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false));
+    CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true));
+    CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false));
+    CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true));
+    reservoir1.add(t1);
+    reservoir1.add(t2);
+    reservoir1.add(t3);
+    reservoir1.add(t4);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+
+    Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
+
+    controlTupleCount = gn.controlTupleCount;
+    Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L);
+    reservoir1.add(endWindow);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+
+    gn.shutdown();
+    t.join();
+
+    Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
+
+    long expected = 0;
+    for (Object o: testSink.collectedTuples) {
+      if (o instanceof CustomControlTuple) {
+        expected++;
+      }
+    }
+    Assert.assertTrue("Number of Custom control tuples", expected == 4);
+  }
+
+  @Test
+  public void testControlTuplesDeliveryOiONode() throws InterruptedException
+  {
+    GenericOperator go = new GenericOperator();
+    final OiONode oioNode = new OiONode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+        new DefaultAttributeMap(), null));
+    oioNode.setId(1);
+
+    OiOStream stream = new OiOStream();
+    SweepableReservoir reservoir = stream.getReservoir();
+    ((OiOStream.OiOReservoir)reservoir).setControlSink((oioNode).getControlSink(reservoir));
+    oioNode.connectInputPort("ip1", reservoir);
+    Sink controlSink = oioNode.getControlSink(reservoir);
+
+    TestSink testSink = new TestSink();
+    oioNode.connectOutputPort("op", testSink);
+    oioNode.firstWindowMillis = 0;
+    oioNode.windowWidthMillis = 100;
+
+    oioNode.activate();
+
+    Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
+    controlSink.put(beginWindow);
+    Assert.assertTrue("Begin window", testSink.getResultCount() == 1);
+
+    CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false));
+    CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true));
+    CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false));
+    CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true));
+    controlSink.put(t1);
+    controlSink.put(t2);
+    controlSink.put(t3);
+    controlSink.put(t4);
+    Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
+
+    Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L);
+    controlSink.put(endWindow);
+
+    oioNode.deactivate();
+    oioNode.shutdown();
+
+    Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
+
+    long expected = 0;
+    for (Object o: testSink.collectedTuples) {
+      if (o instanceof CustomControlTuple) {
+        expected++;
+      }
+    }
+    Assert.assertTrue("Number of Custom control tuples", expected == 4);
+  }
+
+  @Test
+  public void testReservoirPortMapping() throws InterruptedException
+  {
+    long maxSleep = 5000;
+    long sleeptime = 25L;
+    GenericOperator go = new GenericOperator();
+    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+        new DefaultAttributeMap(), null));
+    gn.setId(1);
+    AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
+    AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
+
+    gn.connectInputPort("ip1", reservoir1);
+    gn.connectInputPort("ip2", reservoir2);
+    gn.connectOutputPort("op", Sink.BLACKHOLE);
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        ab.set(true);
+        gn.activate();
+        gn.run();
+        gn.deactivate();
+      }
+    };
+    t.start();
+
+    long interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    } while ((ab.get() == false) && (interval < maxSleep));
+
+    gn.populateReservoirInputPortMap();
+
+    gn.shutdown();
+    t.join();
+
+    Assert.assertTrue("Port Mapping Size", gn.reservoirPortMap.size() == 2);
+    Assert.assertTrue("Sink 1 is not a port", gn.reservoirPortMap.get(reservoir1) instanceof Operator.InputPort);
+    Assert.assertTrue("Sink 2 is not a port", gn.reservoirPortMap.get(reservoir2) instanceof Operator.InputPort);
+  }
+
+  @Test
   public void testDoubleCheckpointAtleastOnce() throws Exception
   {
     NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, true, testMeta.getDir());


Mime
View raw message