apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tus...@apache.org
Subject [1/2] apex-core git commit: APEXCORE-617 InputNodeTest intermittently fails with ConcurrentModificationException
Date Tue, 14 Mar 2017 12:29:15 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master ad4210ba7 -> 10650b3a0


APEXCORE-617 InputNodeTest intermittently fails with ConcurrentModificationException


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/25f1ac5c
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/25f1ac5c
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/25f1ac5c

Branch: refs/heads/master
Commit: 25f1ac5c84a6bd86879e5f947d11edafe351b25a
Parents: a469dfb
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Sat Jan 21 09:19:59 2017 -0800
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Sat Jan 21 10:10:17 2017 -0800

----------------------------------------------------------------------
 .../stram/engine/GenericNodeTest.java           | 43 -----------
 .../datatorrent/stram/engine/InputNodeTest.java | 81 ++------------------
 .../com/datatorrent/stram/engine/NodeTest.java  | 62 ++++++++++++---
 3 files changed, 55 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/25f1ac5c/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index af99e98..88f3e42 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,7 +38,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
@@ -47,7 +45,6 @@ import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
-import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.Stats.OperatorStats;
@@ -240,46 +237,6 @@ public class GenericNodeTest
     }
   }
 
-  public static class GenericCheckpointOperator extends GenericOperator implements CheckpointNotificationListener
-  {
-    public Set<Long> checkpointedWindows = Sets.newHashSet();
-    public volatile boolean checkpointTwice = false;
-    public volatile int numWindows = 0;
-
-    public GenericCheckpointOperator()
-    {
-    }
-
-    @Override
-    public void beginWindow(long windowId)
-    {
-      super.beginWindow(windowId);
-    }
-
-    @Override
-    public void endWindow()
-    {
-      super.endWindow();
-      numWindows++;
-    }
-
-    @Override
-    public void checkpointed(long windowId)
-    {
-      checkpointTwice = checkpointTwice || !checkpointedWindows.add(windowId);
-    }
-
-    @Override
-    public void committed(long windowId)
-    {
-    }
-
-    @Override
-    public void beforeCheckpoint(long windowId)
-    {
-    }
-  }
-
   @Test
   @SuppressWarnings("SleepWhileInLoop")
   public void testSynchingLogic() throws InterruptedException

http://git-wip-us.apache.org/repos/asf/apex-core/blob/25f1ac5c/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
index e182b75..035bb31 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
@@ -18,27 +18,21 @@
  */
 package com.datatorrent.stram.engine;
 
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
-import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator.IdleTimeHandler;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Sink;
 import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.stram.engine.GenericNodeTest.FSTestWatcher;
-import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.ResetWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -60,7 +54,6 @@ public class InputNodeTest
     emitTestHelper(false);
   }
 
-  @SuppressWarnings("deprecation")
   private void emitTestHelper(boolean trueEmitTuplesFalseHandleIdleTime) throws Exception
   {
     TestInputOperator tio = new TestInputOperator();
@@ -69,32 +62,29 @@ public class InputNodeTest
     dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 10);
     dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 10);
 
-    final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0,
"operator", dam, null));
-    in.setId(1);
+    final InputNode in = new InputNode(tio, new OperatorContext(0, "operator", dam, null));
 
     TestSink testSink = new TestSink();
 
     in.connectInputPort(Node.INPUT, new TestWindowGenerator());
     in.connectOutputPort("output", testSink);
 
-    final AtomicBoolean ab = new AtomicBoolean(false);
     Thread t = new Thread()
     {
       @Override
       public void run()
       {
-        ab.set(true);
         in.activate();
         in.run();
         in.deactivate();
       }
-
     };
     t.start();
 
     Thread.sleep(3000);
 
-    t.stop();
+    in.shutdown();
+    t.join();
 
     Assert.assertTrue("Should have emitted some tuples", testSink.collectedTuples.size()
> 0);
 
@@ -237,48 +227,7 @@ public class InputNodeTest
     private static final Logger LOG = LoggerFactory.getLogger(TestWindowGenerator.class);
   }
 
-
-  public static class InputCheckpointOperator extends GenericCheckpointOperator implements
InputOperator
-  {
-    public Set<Long> checkpointedWindows = Sets.newHashSet();
-    public volatile boolean checkpointTwice = false;
-    public volatile int numWindows = 0;
-
-    public InputCheckpointOperator()
-    {
-    }
-
-    @Override
-    public void beginWindow(long windowId)
-    {
-      super.beginWindow(windowId);
-    }
-
-    @Override
-    public void endWindow()
-    {
-      super.endWindow();
-    }
-
-    @Override
-    public void checkpointed(long windowId)
-    {
-      super.checkpointed(windowId);
-    }
-
-    @Override
-    public void committed(long windowId)
-    {
-      super.committed(windowId);
-    }
-
-    @Override
-    public void emitTuples()
-    {
-    }
-  }
-
-  public static class TestInputOperator implements InputOperator, IdleTimeHandler
+  private static class TestInputOperator extends BaseOperator implements InputOperator, IdleTimeHandler
   {
     public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
 
@@ -294,26 +243,6 @@ public class InputNodeTest
     }
 
     @Override
-    public void beginWindow(long windowId)
-    {
-    }
-
-    @Override
-    public void endWindow()
-    {
-    }
-
-    @Override
-    public void setup(OperatorContext context)
-    {
-    }
-
-    @Override
-    public void teardown()
-    {
-    }
-
-    @Override
     public void handleIdleTime()
     {
       if (!trueEmitTuplesFalseHandleIdleTime) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/25f1ac5c/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
index 26bd7a0..55b5eab 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/NodeTest.java
@@ -20,7 +20,7 @@ package com.datatorrent.stram.engine;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -28,18 +28,20 @@ import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.collect.Sets;
+
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
 import com.datatorrent.stram.StramLocalCluster;
-import com.datatorrent.stram.engine.GenericNodeTest.GenericCheckpointOperator;
-import com.datatorrent.stram.engine.InputNodeTest.InputCheckpointOperator;
+import com.datatorrent.stram.engine.GenericNodeTest.GenericOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 
 /**
@@ -276,12 +278,12 @@ public class NodeTest
     windowGenerator.setWindowWidth(100);
     windowGenerator.setCheckpointCount(1, 0);
 
-    GenericCheckpointOperator gco;
+    CheckpointTestOperator checkpointTestOperator;
 
     if (trueGenericFalseInput) {
-      gco = new GenericCheckpointOperator();
+      checkpointTestOperator = new CheckpointTestOperator();
     } else {
-      gco = new InputCheckpointOperator();
+      checkpointTestOperator = new InputCheckpointTestOperator();
     }
     DefaultAttributeMap dam = new DefaultAttributeMap();
     dam.put(com.datatorrent.stram.engine.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
@@ -292,9 +294,9 @@ public class NodeTest
     final Node in;
 
     if (trueGenericFalseInput) {
-      in = new GenericNode(gco, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
dam, null));
+      in = new GenericNode(checkpointTestOperator, new com.datatorrent.stram.engine.OperatorContext(0,
"operator", dam, null));
     } else {
-      in = new InputNode((InputCheckpointOperator)gco, new com.datatorrent.stram.engine.OperatorContext(0,
"operator",
+      in = new InputNode((InputCheckpointTestOperator)checkpointTestOperator, new com.datatorrent.stram.engine.OperatorContext(0,
"operator",
           dam, null));
     }
 
@@ -316,13 +318,11 @@ public class NodeTest
 
     windowGenerator.activate(null);
 
-    final AtomicBoolean ab = new AtomicBoolean(false);
     Thread t = new Thread()
     {
       @Override
       public void run()
       {
-        ab.set(true);
         in.activate();
         in.run();
         in.deactivate();
@@ -334,7 +334,7 @@ public class NodeTest
     long startTime = System.currentTimeMillis();
     long endTime = 0;
 
-    while (gco.numWindows < 3 && ((endTime = System.currentTimeMillis()) - startTime)
< 6000) {
+    while (checkpointTestOperator.numWindows < 3 && ((endTime = System.currentTimeMillis())
- startTime) < 6000) {
       Thread.sleep(50);
     }
 
@@ -343,7 +343,45 @@ public class NodeTest
 
     windowGenerator.deactivate();
 
-    Assert.assertFalse(gco.checkpointTwice);
+    Assert.assertFalse(checkpointTestOperator.checkpointTwice);
     Assert.assertTrue("Timed out", (endTime - startTime) < 5000);
   }
+
+  private static class CheckpointTestOperator extends GenericOperator implements CheckpointNotificationListener
+  {
+    public Set<Long> checkpointedWindows = Sets.newHashSet();
+    public volatile boolean checkpointTwice = false;
+    public volatile int numWindows = 0;
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      numWindows++;
+    }
+
+    @Override
+    public void checkpointed(long windowId)
+    {
+      checkpointTwice = checkpointTwice || !checkpointedWindows.add(windowId);
+    }
+
+    @Override
+    public void committed(long windowId)
+    {
+    }
+
+    @Override
+    public void beforeCheckpoint(long windowId)
+    {
+    }
+  }
+
+  private static class InputCheckpointTestOperator extends CheckpointTestOperator implements
InputOperator
+  {
+    @Override
+    public void emitTuples()
+    {
+    }
+  }
 }


Mime
View raw message