apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vro...@apache.org
Subject [apex-core] branch master updated: APEXCORE-778 Refactor DelayOperatorTest
Date Sun, 03 Sep 2017 16:53:53 GMT
This is an automated email from the ASF dual-hosted git repository.

vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 59c1a44  APEXCORE-778 Refactor DelayOperatorTest
59c1a44 is described below

commit 59c1a44ca6089aed16a85a50204e8317af32dd55
Author: Vlad Rozov <vrozov@apache.org>
AuthorDate: Tue Aug 22 10:16:14 2017 -0700

    APEXCORE-778 Refactor DelayOperatorTest
---
 .../com/datatorrent/stram/StramLocalCluster.java   |  12 +
 .../stram/plan/logical/DelayOperatorTest.java      | 277 ++++++++++++---------
 2 files changed, 175 insertions(+), 114 deletions(-)

diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 41e358e..eab908c 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -453,6 +453,17 @@ public class StramLocalCluster implements Runnable, Controller
     this.exitCondition = exitCondition;
   }
 
+  public void run(Callable<Boolean> exitCondition)
+  {
+    run(exitCondition, 0);
+  }
+
+  public void run(Callable<Boolean> exitCondition, long runMillis)
+  {
+    setExitCondition(exitCondition);
+    run(runMillis);
+  }
+
   @Override
   public void run()
   {
@@ -519,6 +530,7 @@ public class StramLocalCluster implements Runnable, Controller
 
         try {
           if (exitCondition != null && exitCondition.call()) {
+            LOG.info("Stopping on exit condition");
             appDone = true;
           }
         } catch (Exception ex) {
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 285aba3..bf4e292 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -18,14 +18,13 @@
  */
 package com.datatorrent.stram.plan.logical;
 
+import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -40,6 +39,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -50,6 +50,7 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
+import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.DefaultDelayOperator;
 import com.datatorrent.stram.StramLocalCluster;
@@ -65,7 +66,11 @@ import com.datatorrent.stram.support.StramTestSupport;
 import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 
+import static com.datatorrent.stram.plan.logical.DelayOperatorTest.FibonacciOperator.assertFibonacci;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -76,7 +81,7 @@ public class DelayOperatorTest
   @Rule
   public TestMeta testMeta = new TestMeta();
 
-  private static Lock sequential = new ReentrantLock();
+  private static final Lock sequential = new ReentrantLock();
 
   @Before
   public void setup()
@@ -98,7 +103,7 @@ public class DelayOperatorTest
     GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
     GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
     GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
-    DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+    DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
 
     dag.addStream("BtoC", opB.outport1, opC.inport1);
     dag.addStream("CtoD", opC.outport1, opD.inport1);
@@ -177,136 +182,209 @@ public class DelayOperatorTest
     dag.validate();
   }
 
-  public static final Long[] FIBONACCI_NUMBERS = new Long[]{
-      1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L,
2584L, 4181L, 6765L,
-      10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L,
1346269L, 2178309L,
-      3524578L, 5702887L, 9227465L, 14930352L, 24157817L, 39088169L, 63245986L, 102334155L
-  };
+  private static class ExitCondition implements Callable<Boolean>
+  {
+    private static boolean failed;
+    private static String message;
+    private final int size;
+    private final Callable<Boolean> exitCondition;
+
+    ExitCondition(int size, Callable<Boolean> exitCondition)
+    {
+      FailableFibonacciOperator.results.clear();
+      failed = false;
+      this.size = size;
+      this.exitCondition = exitCondition;
+    }
+
+    @Override
+    public Boolean call() throws Exception
+    {
+      return failed ||
+          FailableFibonacciOperator.results.size() >= size && (exitCondition ==
null || exitCondition.call());
+    }
+  }
 
-  public static class FibonacciOperator extends BaseOperator
+  static class FibonacciOperator extends BaseOperator
   {
-    public static List<Long> results = new ArrayList<>();
-    public long currentNumber = 1;
-    private transient long tempNum;
+    static final List<BigInteger> results = new ArrayList<>();
 
-    public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
+    private int index = 0;
+    private BigInteger currentNumber = BigInteger.ONE;
+    private transient BigInteger tempNum = BigInteger.ZERO;
+
+    final transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
     {
       @Override
       public void process(Object tuple)
       {
       }
     };
-    public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
+
+    final transient DefaultInputPort<BigInteger> input = new DefaultInputPort<BigInteger>()
     {
       @Override
-      public void process(Long tuple)
+      public void process(BigInteger tuple)
       {
         tempNum = tuple;
       }
     };
-    public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+    final transient DefaultOutputPort<BigInteger> output = new DefaultOutputPort<>();
 
     @Override
     public void endWindow()
     {
+      if (ExitCondition.failed) {
+        return;
+      }
+      if (index > results.size()) {
+        ExitCondition.failed = true;
+        ExitCondition.message = "index " + index + " > result.size() " + results.size();
+        return;
+      }
       output.emit(currentNumber);
-      results.add(currentNumber);
-      currentNumber += tempNum;
-      if (currentNumber <= 0) {
-        // overflow
-        currentNumber = 1;
+      if (index == results.size()) {
+        results.add(currentNumber);
+      } else if (!results.get(index).equals(currentNumber)) {
+        ExitCondition.failed = true;
+        ExitCondition.message = "current number " + currentNumber + " does not match result
" +
+            results.get(index) + " at position " + index;
+        return;
+      }
+      index++;
+      currentNumber = currentNumber.add(tempNum);
+    }
+
+    static void assertFibonacci()
+    {
+      for (int i = 2; i < results.size(); i++) {
+        if (!results.get(i).equals(results.get(i - 1).add(results.get(i - 2)))) {
+          fail("Not a Fibonacci number " + results.get(i) + " [" + StringUtils.join(results,
",") + "]");
+        }
       }
     }
 
   }
 
-  public static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener
+  private static class FailableOperator
   {
+    private final String baseOperator;
     private boolean committed = false;
-    private int simulateFailureWindows = 0;
-    private boolean simulateFailureAfterCommit = false;
+    private final int simulateFailureWindows;
+    private final boolean simulateFailureAfterCommit;
     private int windowCount = 0;
-    public static volatile boolean failureSimulated = false;
+    private static volatile boolean failureSimulated;
 
-    @Override
-    public void beginWindow(long windowId)
+    private static final Callable<Boolean> isFailureSimulated = new Callable<Boolean>()
+    {
+      @Override
+      public Boolean call() throws Exception
+      {
+        return failureSimulated;
+      }
+    };
+
+    @SuppressWarnings("unused")
+    private FailableOperator()
+    {
+      baseOperator = null;
+      simulateFailureWindows = 0;
+      simulateFailureAfterCommit = false;
+    }
+
+    @SuppressWarnings("SameParameterValue")
+    FailableOperator(BaseOperator baseOperator, int windows, boolean afterCommit)
     {
-      if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit &&
committed) || !simulateFailureAfterCommit) &&
-          !failureSimulated) {
-        LOG.debug("FailableFibonacciOperator beginWindow {} {} {}", windowId, windowCount,
simulateFailureWindows);
-        if (windowCount++ == simulateFailureWindows) {
-          failureSimulated = true;
-          LOG.debug("FailableFibonacciOperator is simulating failure");
-          throw new RuntimeException("simulating failure");
+      failureSimulated = false;
+      this.baseOperator = baseOperator.getClass().getSimpleName();
+      this.simulateFailureWindows = windows;
+      this.simulateFailureAfterCommit = afterCommit;
+    }
+
+    void beginWindow(long windowId)
+    {
+      if (simulateFailureWindows > 0 && !failureSimulated) {
+        if (simulateFailureAfterCommit && !committed) {
+          if ((int)windowId > 0x10) {
+            LOG.warn("{} window {} is not committed", baseOperator, Codec.getStringWindowId(windowId));
+          }
+        } else {
+          LOG.debug("{} beginWindow {} {} {}", baseOperator, Codec.getStringWindowId(windowId),
+              windowCount, simulateFailureWindows);
+          if (windowCount++ == simulateFailureWindows) {
+            failureSimulated = true;
+            LOG.debug("{} is simulating failure", baseOperator);
+            throw new RuntimeException("simulating " + baseOperator + " failure");
+          }
         }
       }
     }
 
+    void checkpointed(long windowId)
+    {
+      LOG.debug("{} checkpointed at {}", baseOperator, Codec.getStringWindowId(windowId));
+    }
+
+    void committed(long windowId)
+    {
+      LOG.debug("{} committed at {}", baseOperator, Codec.getStringWindowId(windowId));
+      committed = true;
+    }
+
+  }
+
+  @SuppressWarnings("deprecation")
+  static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener
+  {
+    private FailableOperator failableOperator;
+
     @Override
-    public void checkpointed(long windowId)
+    public void beginWindow(long windowId)
     {
-      LOG.debug("FailableFibonacciOperator is checkpointed {}", windowId);
+      failableOperator.beginWindow(windowId);
     }
 
     @Override
-    public void committed(long windowId)
+    public void checkpointed(long windowId)
     {
-      LOG.debug("FailableFibonacciOperator is committed {}", windowId);
-      committed = true;
+      failableOperator.checkpointed(windowId);
     }
 
-    public void setSimulateFailureWindows(int windows, boolean afterCommit)
+    @Override
+    public void committed(long windowId)
     {
-      this.simulateFailureAfterCommit = afterCommit;
-      this.simulateFailureWindows = windows;
+      failableOperator.committed(windowId);
     }
   }
 
-  public static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener
+  @SuppressWarnings("deprecation")
+  static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener
   {
-    private boolean committed = false;
-    private int simulateFailureWindows = 0;
-    private boolean simulateFailureAfterCommit = false;
-    private int windowCount = 0;
-    private static volatile boolean failureSimulated = false;
+    private FailableOperator failableOperator;
 
     @Override
     public void beginWindow(long windowId)
     {
       super.beginWindow(windowId);
-      if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit &&
committed) || !simulateFailureAfterCommit) &&
-          !failureSimulated) {
-        LOG.debug("FailableDelayOperator beginWindow {} {} {}", windowId, windowCount, simulateFailureWindows);
-        if (windowCount++ == simulateFailureWindows) {
-          failureSimulated = true;
-          LOG.debug("FailableDelayOperator is simulating failure {}", windowId);
-          throw new RuntimeException("simulating failure");
-        }
-      }
+      failableOperator.beginWindow(windowId);
     }
 
     @Override
     public void checkpointed(long windowId)
     {
-      LOG.debug("FailableDelayOperator is checkpointed {}", windowId);
+      failableOperator.checkpointed(windowId);
     }
 
     @Override
     public void committed(long windowId)
     {
-      LOG.debug("FailableDelayOperator is committed {}", windowId);
-      committed = true;
-    }
-
-    public void setSimulateFailureWindows(int windows, boolean afterCommit)
-    {
-      this.simulateFailureAfterCommit = afterCommit;
-      this.simulateFailureWindows = windows;
+      failableOperator.committed(windowId);
     }
   }
 
 
-  @Test
+  @Test(timeout = 60000)
   public void testFibonacci() throws Exception
   {
     LogicalPlan dag = new LogicalPlan();
@@ -318,22 +396,12 @@ public class DelayOperatorTest
     dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
     dag.addStream("operator_to_delay", fib.output, opDelay.input);
     dag.addStream("delay_to_operator", opDelay.output, fib.input);
-    FibonacciOperator.results.clear();
-    final StramLocalCluster localCluster = new StramLocalCluster(dag);
-    localCluster.setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return FibonacciOperator.results.size() >= 10;
-      }
-    });
-    localCluster.run(10000);
-    Assert.assertArrayEquals(Arrays.copyOfRange(FIBONACCI_NUMBERS, 0, 10),
-        FibonacciOperator.results.subList(0, 10).toArray());
+    new StramLocalCluster(dag).run(new ExitCondition(10, null));
+    assertFalse(ExitCondition.message, ExitCondition.failed);
+    assertFibonacci();
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testFibonacciRecovery1() throws Exception
   {
     LogicalPlan dag = StramTestSupport.createDAG(testMeta);
@@ -342,7 +410,7 @@ public class DelayOperatorTest
     FailableFibonacciOperator fib = dag.addOperator("FIB", FailableFibonacciOperator.class);
     DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
 
-    fib.setSimulateFailureWindows(3, true);
+    fib.failableOperator = new FailableOperator(fib, 3, true);
 
     dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
     dag.addStream("operator_to_delay", fib.output, opDelay.input);
@@ -350,25 +418,16 @@ public class DelayOperatorTest
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
-    FailableFibonacciOperator.results.clear();
-    FailableFibonacciOperator.failureSimulated = false;
     final StramLocalCluster localCluster = new StramLocalCluster(dag);
     localCluster.setPerContainerBufferServer(true);
-    localCluster.setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return FailableFibonacciOperator.results.size() >= 30;
-      }
-    });
-    localCluster.run(60000);
-    Assert.assertTrue("failure should be invoked", FailableFibonacciOperator.failureSimulated);
-    Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(),
0, 20),
-        Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+    localCluster.run(new ExitCondition(30, FailableOperator.isFailureSimulated));
+
+    assertFalse(ExitCondition.message, ExitCondition.failed);
+    assertTrue(FibonacciOperator.results.size() >= 30);
+    assertFibonacci();
   }
 
-  @Test
+  @Test(timeout = 60000)
   public void testFibonacciRecovery2() throws Exception
   {
     LogicalPlan dag = StramTestSupport.createDAG(testMeta);
@@ -377,7 +436,7 @@ public class DelayOperatorTest
     FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
     FailableDelayOperator opDelay = dag.addOperator("opDelay", FailableDelayOperator.class);
 
-    opDelay.setSimulateFailureWindows(5, true);
+    opDelay.failableOperator = new FailableOperator(opDelay, 5, true);
 
     dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
     dag.addStream("operator_to_delay", fib.output, opDelay.input);
@@ -385,23 +444,13 @@ public class DelayOperatorTest
     dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
     dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
     dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
-    FibonacciOperator.results.clear();
-    FailableDelayOperator.failureSimulated = false;
     final StramLocalCluster localCluster = new StramLocalCluster(dag);
     localCluster.setPerContainerBufferServer(true);
-    localCluster.setExitCondition(new Callable<Boolean>()
-    {
-      @Override
-      public Boolean call() throws Exception
-      {
-        return FibonacciOperator.results.size() >= 30;
-      }
-    });
-    localCluster.run(60000);
+    localCluster.run(new ExitCondition(30, FailableOperator.isFailureSimulated));
 
-    Assert.assertTrue("failure should be invoked", FailableDelayOperator.failureSimulated);
-    Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(),
0, 20),
-        Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+    assertFalse(ExitCondition.message, ExitCondition.failed);
+    assertTrue(FibonacciOperator.results.size() >= 30);
+    assertFibonacci();
   }
 
   @Test
@@ -473,7 +522,7 @@ public class DelayOperatorTest
     TestGeneratorInputOperator source = dag.addOperator("A", TestGeneratorInputOperator.class);
     GenericTestOperator op1 = dag.addOperator("Op1", GenericTestOperator.class);
     GenericTestOperator op2 = dag.addOperator("Op2", GenericTestOperator.class);
-    DefaultDelayOperator<Object> delay = dag.addOperator("Delay", DefaultDelayOperator.class);
+    DefaultDelayOperator delay = dag.addOperator("Delay", DefaultDelayOperator.class);
 
     dag.addStream("Source", source.outport, op1.inport1);
     dag.addStream("Stream1", op1.outport1, op2.inport1);

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <commits@apex.apache.org>'].

Mime
View raw message