beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-182] Bound trials in assertSplitAtFraction
Date Thu, 07 Apr 2016 23:53:49 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 2b3216bbe -> d827e1b96


[BEAM-182] Bound trials in assertSplitAtFraction

At most 100 trials per item and 1000 trials total.
This appears to be enough for tests such as AvroSourceTest
in the majority of cases, which means if bugs exist, they
will be found often enough.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be6e9dd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be6e9dd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be6e9dd8

Branch: refs/heads/master
Commit: be6e9dd8371fb2bf1222574fc1e257fb3fe18b9c
Parents: 2b3216b
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Thu Apr 7 12:29:38 2016 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Thu Apr 7 15:03:09 2016 -0700

----------------------------------------------------------------------
 .../dataflow/sdk/testing/SourceTestUtils.java   | 35 +++++++++++++++++++-
 1 file changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/be6e9dd8/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
index 8062cfd..a0b4ffd 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/testing/SourceTestUtils.java
@@ -32,6 +32,8 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.values.KV;
 
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -72,6 +74,8 @@ import java.util.concurrent.Future;
  * <p>Like {@link DataflowAssert}, requires JUnit and Hamcrest to be present in the
classpath.
  */
 public class SourceTestUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(SourceTestUtils.class);
+
   // A wrapper around a value of type T that compares according to the structural
   // value provided by a Coder<T>, but prints both the original and structural value,
   // to help get good error messages from JUnit equality assertion failures and such.
@@ -531,6 +535,9 @@ public class SourceTestUtils {
     }
   }
 
+  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM = 100;
+  private static final int MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL = 1000;
+
   /**
    * Asserts that for each possible start position,
    * {@link BoundedSource.BoundedReader#splitAtFraction} at every interesting fraction (halfway
@@ -572,6 +579,7 @@ public class SourceTestUtils {
       // To ensure that the test is non-vacuous, make sure that the splitting succeeds
       // at least once and fails at least once.
       ExecutorService executor = Executors.newFixedThreadPool(2);
+      int numTotalTrials = 0;
       for (int i = 0; i < expectedItems.size(); i++) {
         double minNonTrivialFraction = 2.0;  // Greater than any possible fraction.
         for (double fraction : allNonTrivialFractions.get(i)) {
@@ -582,16 +590,41 @@ public class SourceTestUtils {
           // detect vacuousness.
           continue;
         }
+        int numTrials = 0;
         boolean haveSuccess = false, haveFailure = false;
-        while (!haveSuccess || !haveFailure) {
+        while (true) {
+          ++numTrials;
+          if (numTrials > MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM) {
+            LOG.warn(
+                "After {} concurrent splitting trials at item #{}, observed only {}, "
+                + "giving up on this item",
+                numTrials, i, haveSuccess ? "success" : "failure");
+            break;
+          }
           if (assertSplitAtFractionConcurrent(
               executor, source, expectedItems, i, minNonTrivialFraction, options)) {
             haveSuccess = true;
           } else {
             haveFailure = true;
           }
+          if (haveSuccess && haveFailure) {
+            LOG.info(
+                "{} trials to observe both success and failure of concurrent splitting at
item #{}",
+                numTrials, i);
+            break;
+          }
+        }
+        numTotalTrials += numTrials;
+        if (numTotalTrials > MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL) {
+          LOG.warn(
+              "After {} total concurrent splitting trials, considered only {} items, giving
up.",
+              numTotalTrials, i);
+          break;
         }
       }
+      LOG.info(
+          "{} total concurrent splitting trials for {} items",
+          numTotalTrials, expectedItems.size());
     }
   }
 


Mime
View raw message