apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [2/9] incubator-apex-core git commit: Skipping endWindow and operator is shutdown prematurely. APEX-58 #resolve
Date Mon, 09 Nov 2015 16:34:04 GMT
Skipping endWindow and operator is shutdown prematurely. APEX-58 #resolve


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/18d43731
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/18d43731
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/18d43731

Branch: refs/heads/feature-module
Commit: 18d437315689aeea32c391a9670f7fc17554fed2
Parents: 8d9aa4b
Author: Pramod Immaneni <pramod@datatorrent.com>
Authored: Tue Oct 27 18:22:32 2015 -0700
Committer: Pramod Immaneni <pramod@datatorrent.com>
Committed: Mon Nov 2 10:41:47 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/engine/GenericNode.java   |  6 +-
 .../java/com/datatorrent/stram/engine/Node.java | 33 ++++++--
 .../stram/engine/GenericNodeTest.java           | 85 ++++++++++++++++++++
 3 files changed, 118 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 3902f37..26ba98a 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -553,7 +553,11 @@ public class GenericNode extends Node<Operator>
       }
     }
 
-    if (insideWindow) {
+    /**
+     * TODO: If shutdown and inside window provide alternate way of notifying the operator
in such ways
+     * TODO: as using a listener callback
+     */
+    if (insideWindow && !shutdown) {
       endWindowEmitTime = System.currentTimeMillis();
       operator.endWindow();
       if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index b073dcd..c66df12 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -28,25 +28,44 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.util.ReflectionUtils;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.math.IntMath;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Operator.Unifier;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
-
+import com.datatorrent.api.StorageAgent;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.Pair;
@@ -297,6 +316,10 @@ public abstract class Node<OPERATOR extends Operator> implements
Component<Opera
       logger.warn("Shutdown requested when context is not available!");
     }
     else {
+      /*
+       * Since alive is non-volatile this code explicitly unsets it in the operator lifecycle
theread thereby notifying
+       * it even when the thread is reading it from the cache
+       */
       context.request(new OperatorRequest()
       {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 9e62ac5..d5ceae6 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -211,4 +211,89 @@ public class GenericNodeTest
     Assert.assertEquals(Thread.State.TERMINATED, t.getState());
   }
 
+  @Test
+  public void testPrematureTermination() throws InterruptedException
+  {
+    long maxSleep = 5000;
+    long sleeptime = 25L;
+    GenericOperator go = new GenericOperator();
+    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0,
new DefaultAttributeMap(), null));
+    gn.setId(1);
+    DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
+    DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
+
+    gn.connectInputPort("ip1", reservoir1);
+    gn.connectInputPort("ip2", reservoir2);
+    gn.connectOutputPort("op", Sink.BLACKHOLE);
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        ab.set(true);
+        gn.activate();
+        gn.run();
+        gn.deactivate();
+      }
+
+    };
+    t.start();
+
+    long interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((ab.get() == false) && (interval < maxSleep));
+
+
+    int controlTupleCount = gn.controlTupleCount;
+    Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
+
+    reservoir1.add(beginWindow1);
+    reservoir2.add(beginWindow1);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+    Assert.assertTrue("Begin window called", go.endWindowId != go.beginWindowId);
+    controlTupleCount = gn.controlTupleCount;
+
+    Tuple endWindow1 = new EndWindowTuple(0x1L);
+
+    reservoir1.add(endWindow1);
+    reservoir2.add(endWindow1);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+    Assert.assertTrue("End window called", go.endWindowId == go.beginWindowId);
+    controlTupleCount = gn.controlTupleCount;
+
+    Tuple beginWindow2 = new Tuple(MessageType.BEGIN_WINDOW, 0x2L);
+
+    reservoir1.add(beginWindow2);
+    reservoir2.add(beginWindow2);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+
+    gn.shutdown();
+    t.join();
+
+    Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
+  }
+
 }


Mime
View raw message