beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: Roll-forwards: Base PAssert on GBK instead of side inputs
Date Thu, 16 Jun 2016 16:48:56 GMT
Roll-forwards: Base PAssert on GBK instead of side inputs

Previously PAssert - hence all RunnableOnService/NeedsRunner
tests - required side input support. This created a very steep
on ramp for new runners.

GroupByKey is a bit more fundamental and most backends will be
able to group by key in the global window very quickly. So switching
the primitive used to gather all the contents of a PCollection for
assertions should make it a bit easier to get early feedback during
runner development.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba9ddf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba9ddf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba9ddf2

Branch: refs/heads/master
Commit: eba9ddf2538d37f2c5a8875a37d9f9721ee34ea1
Parents: 591db6f
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Jun 8 15:07:52 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Jun 15 11:27:34 2016 -0700

----------------------------------------------------------------------
 .../testing/TestDataflowPipelineRunner.java     |   3 +-
 .../org/apache/beam/sdk/testing/PAssert.java    | 857 +++++++++++--------
 .../apache/beam/sdk/testing/PAssertTest.java    |  27 -
 3 files changed, 491 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba9ddf2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
index 3e8d903..c940e9a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java
@@ -166,7 +166,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
     if (transform instanceof PAssert.OneSideInputAssert
-        || transform instanceof PAssert.TwoSideInputAssert) {
+        || transform instanceof PAssert.GroupThenAssert
+        || transform instanceof PAssert.GroupThenAssertForSingleton) {
       expectedNumberOfAssertions += 1;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba9ddf2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index c2cd598..a29a56d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
@@ -34,46 +36,46 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
- * An assertion on the contents of a {@link PCollection}
- * incorporated into the pipeline.  Such an assertion
- * can be checked no matter what kind of {@link PipelineRunner} is
- * used.
+ * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an
+ * assertion can be checked no matter what kind of {@link PipelineRunner} is used.
  *
- * <p>Note that the {@code PAssert} call must precede the call
- * to {@link Pipeline#run}.
+ * <p>Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}.
  *
- * <p>Examples of use:
- * <pre>{@code
+ * <p>Examples of use: <pre>{@code
  * Pipeline p = TestPipeline.create();
  * ...
  * PCollection<String> output =
@@ -103,34 +105,92 @@ public class PAssert {
 
   private static int assertCount = 0;
 
+  private static String nextAssertionName() {
+    return "PAssert$" + (assertCount++);
+  }
+
   // Do not instantiate.
   private PAssert() {}
 
   /**
-   * Constructs an {@link IterableAssert} for the elements of the provided
-   * {@link PCollection}.
+   * Builder interface for assertions applicable to iterables and PCollection contents.
+   */
+  public interface IterableAssert<T> {
+
+    /**
+     * Asserts that the iterable in question contains the provided elements.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> containsInAnyOrder(T... expectedElements);
+
+    /**
+     * Asserts that the iterable in question contains the provided elements.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements);
+
+    /**
+     * Asserts that the iterable in question is empty.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> empty();
+
+    /**
+     * Applies the provided checking function (presumably containing assertions) to the
+     * iterable in question.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn);
+  }
+
+  /**
+   * Builder interface for assertions applicable to a single value.
+   */
+  public interface SingletonAssert<T> {
+    /**
+     * Asserts that the value in question is equal to the provided value, according to
+     * {@link Object#equals}.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert<T> isEqualTo(T expected);
+
+    /**
+     * Asserts that the value in question is not equal to the provided value, according
+     * to {@link Object#equals}.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert<T> notEqualTo(T notExpected);
+
+    /**
+     * Applies the provided checking function (presumably containing assertions) to the
+     * value in question.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn);
+  }
+
+  /**
+   * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}.
    */
   public static <T> IterableAssert<T> that(PCollection<T> actual) {
-    return new IterableAssert<>(
-        new CreateActual<T, Iterable<T>>(actual, View.<T>asIterable()),
-         actual.getPipeline())
-         .setCoder(actual.getCoder());
+    return new PCollectionContentsAssert<>(actual);
   }
 
   /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@link PCollection} which must contain a single {@code Iterable<T>}
-   * value.
+   * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which
+   * must contain a single {@code Iterable<T>} value.
    */
-  public static <T> IterableAssert<T>
-      thatSingletonIterable(PCollection<? extends Iterable<T>> actual) {
+  public static <T> IterableAssert<T> thatSingletonIterable(
+      PCollection<? extends Iterable<T>> actual) {
 
-    List<? extends Coder<?>> maybeElementCoder = actual.getCoder().getCoderArguments();
-    Coder<T> tCoder;
     try {
-      @SuppressWarnings("unchecked")
-      Coder<T> tCoderTmp = (Coder<T>) Iterables.getOnlyElement(maybeElementCoder);
-      tCoder = tCoderTmp;
     } catch (NoSuchElementException | IllegalArgumentException exc) {
       throw new IllegalArgumentException(
           "PAssert.<T>thatSingletonIterable requires a PCollection<Iterable<T>>"
@@ -141,19 +201,7 @@ public class PAssert {
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection<Iterable<T>> actualIterables = (PCollection<Iterable<T>>) actual;
 
-    return new IterableAssert<>(
-        new CreateActual<Iterable<T>, Iterable<T>>(
-            actualIterables, View.<Iterable<T>>asSingleton()),
-        actual.getPipeline())
-        .setCoder(tCoder);
-  }
-
-  /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@code PCollectionView PCollectionView<Iterable<T>>}.
-   */
-  public static <T> IterableAssert<T> thatIterable(PCollectionView<Iterable<T>> actual) {
-    return new IterableAssert<>(new PreExisting<Iterable<T>>(actual), actual.getPipeline());
+    return new PCollectionSingletonIterableAssert<>(actualIterables);
   }
 
   /**
@@ -161,93 +209,96 @@ public class PAssert {
    * {@code PCollection PCollection<T>}, which must be a singleton.
    */
   public static <T> SingletonAssert<T> thatSingleton(PCollection<T> actual) {
-    return new SingletonAssert<>(
-        new CreateActual<T, T>(actual, View.<T>asSingleton()), actual.getPipeline())
-        .setCoder(actual.getCoder());
+    return new PCollectionViewAssert<>(actual, View.<T>asSingleton(), actual.getCoder());
   }
 
   /**
    * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
    *
-   * <p>Note that the actual value must be coded by a {@link KvCoder},
-   * not just any {@code Coder<K, V>}.
+   * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
+   * {@code Coder<K, V>}.
    */
-  public static <K, V> SingletonAssert<Map<K, Iterable<V>>>
-      thatMultimap(PCollection<KV<K, V>> actual) {
+  public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
+      PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
-    return new SingletonAssert<>(
-        new CreateActual<>(actual, View.<K, V>asMultimap()), actual.getPipeline())
-        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
+    return new PCollectionViewAssert<>(
+        actual,
+        View.<K, V>asMultimap(),
+        MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())));
   }
 
   /**
-   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection},
-   * which must have at most one value per key.
+   * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which
+   * must have at most one value per key.
    *
-   * <p>Note that the actual value must be coded by a {@link KvCoder},
-   * not just any {@code Coder<K, V>}.
+   * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
+   * {@code Coder<K, V>}.
    */
   public static <K, V> SingletonAssert<Map<K, V>> thatMap(PCollection<KV<K, V>> actual) {
     @SuppressWarnings("unchecked")
     KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
-
-    return new SingletonAssert<>(
-        new CreateActual<>(actual, View.<K, V>asMap()), actual.getPipeline())
-        .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
+    return new PCollectionViewAssert<>(
+        actual, View.<K, V>asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()));
   }
 
   ////////////////////////////////////////////////////////////
 
   /**
-   * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable<T>}.
+   * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require
+   * the runner to support side inputs.
    */
-  public static class IterableAssert<T> implements Serializable {
-    private final Pipeline pipeline;
-    private final PTransform<PBegin, PCollectionView<Iterable<T>>> createActual;
-    private Optional<Coder<T>> coder;
+  private static class PCollectionContentsAssert<T> implements IterableAssert<T> {
+    private final PCollection<T> actual;
 
-    protected IterableAssert(
-        PTransform<PBegin, PCollectionView<Iterable<T>>> createActual, Pipeline pipeline) {
-      this.createActual = createActual;
-      this.pipeline = pipeline;
-      this.coder = Optional.absent();
+    public PCollectionContentsAssert(PCollection<T> actual) {
+      this.actual = actual;
     }
 
     /**
-     * Sets the coder to use for elements of type {@code T}, as needed for internal purposes.
+     * Checks that the {@code Iterable} contains the expected elements, in any order.
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    public IterableAssert<T> setCoder(Coder<T> coderOrNull) {
-      this.coder = Optional.fromNullable(coderOrNull);
-      return this;
+    @Override
+    @SafeVarargs
+    public final PCollectionContentsAssert<T> containsInAnyOrder(T... expectedElements) {
+      return containsInAnyOrder(Arrays.asList(expectedElements));
     }
 
     /**
-     * Gets the coder, which may yet be absent.
+     * Checks that the {@code Iterable} contains the expected elements, in any order.
+     *
+     * <p>Returns this {@code IterableAssert}.
      */
-    public Coder<T> getCoder() {
-      if (coder.isPresent()) {
-        return coder.get();
-      } else {
-        throw new IllegalStateException(
-            "Attempting to access the coder of an IterableAssert"
-                + " that has not been set yet.");
-      }
+    @Override
+    public PCollectionContentsAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
+    }
+
+    @Override
+    public PCollectionContentsAssert<T> empty() {
+      containsInAnyOrder(Collections.<T>emptyList());
+      return this;
+    }
+
+    @Override
+    public PCollectionContentsAssert<T> satisfies(
+        SerializableFunction<Iterable<T>, Void> checkerFn) {
+      actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn));
+      return this;
     }
 
     /**
-     * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}.
+     * Checks that the {@code Iterable} contains elements that match the provided matchers, in any
+     * order.
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    public IterableAssert<T> satisfies(SerializableFunction<Iterable<T>, Void> checkerFn) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
-      return this;
+    @SafeVarargs
+    final PCollectionContentsAssert<T> containsInAnyOrder(
+        SerializableMatcher<? super T>... elementMatchers) {
+      return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
     }
 
     /**
@@ -255,17 +306,11 @@ public class PAssert {
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    public IterableAssert<T> satisfies(
-        AssertRelation<Iterable<T>, Iterable<T>> relation,
-        final Iterable<T> expectedElements) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new TwoSideInputAssert<Iterable<T>, Iterable<T>>(
-              createActual,
-              new CreateExpected<T, Iterable<T>>(expectedElements, coder, View.<T>asIterable()),
-              relation));
-
-      return this;
+    private PCollectionContentsAssert<T> satisfies(
+        AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
+      return satisfies(
+          new CheckRelationAgainstExpected<Iterable<T>>(
+              relation, expectedElements, IterableCoder.of(actual.getCoder())));
     }
 
     /**
@@ -273,15 +318,14 @@ public class PAssert {
      *
      * <p>Returns this {@code IterableAssert}.
      */
-    IterableAssert<T> satisfies(final SerializableMatcher<Iterable<? extends T>> matcher) {
+    PCollectionContentsAssert<T> satisfies(
+        final SerializableMatcher<Iterable<? extends T>> matcher) {
       // Safe covariant cast. Could be elided by changing a lot of this file to use
       // more flexible bounds.
       @SuppressWarnings({"rawtypes", "unchecked"})
       SerializableFunction<Iterable<T>, Void> checkerFn =
-        (SerializableFunction) new MatcherCheckerFn<>(matcher);
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new OneSideInputAssert<Iterable<T>>(createActual, checkerFn));
+          (SerializableFunction) new MatcherCheckerFn<>(matcher);
+      actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn));
       return this;
     }
 
@@ -300,19 +344,9 @@ public class PAssert {
     }
 
     /**
-     * Checks that the {@code Iterable} is empty.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> empty() {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), Collections.<T>emptyList());
-    }
-
-    /**
      * @throws UnsupportedOperationException always
-     * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects.
-     *    If you meant to test object equality, use a variant of {@link #containsInAnyOrder}
-     *    instead.
+     * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant
+     * to test object equality, use a variant of {@link #containsInAnyOrder} instead.
      */
     @Deprecated
     @Override
@@ -331,169 +365,129 @@ public class PAssert {
       throw new UnsupportedOperationException(
           String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName()));
     }
+  }
 
-    /**
-     * Checks that the {@code Iterable} contains the expected elements, in any
-     * order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public IterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
-      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
-    }
+  /**
+   * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}.
+   * This does not require the runner to support side inputs.
+   */
+  private static class PCollectionSingletonIterableAssert<T> implements IterableAssert<T> {
+    private final PCollection<Iterable<T>> actual;
+    private final Coder<T> elementCoder;
 
-    /**
-     * Checks that the {@code Iterable} contains the expected elements, in any
-     * order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    @SafeVarargs
-    public final IterableAssert<T> containsInAnyOrder(T... expectedElements) {
-      return satisfies(
-        new AssertContainsInAnyOrderRelation<T>(),
-        Arrays.asList(expectedElements));
+    public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) {
+      this.actual = actual;
+
+      @SuppressWarnings("unchecked")
+      Coder<T> typedCoder = (Coder<T>) actual.getCoder().getCoderArguments().get(0);
+      this.elementCoder = typedCoder;
     }
 
-    /**
-     * Checks that the {@code Iterable} contains elements that match the provided matchers,
-     * in any order.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
+    @Override
     @SafeVarargs
-    final IterableAssert<T> containsInAnyOrder(
-        SerializableMatcher<? super T>... elementMatchers) {
-      return satisfies(SerializableMatchers.<T>containsInAnyOrder(elementMatchers));
+    public final PCollectionSingletonIterableAssert<T> containsInAnyOrder(T... expectedElements) {
+      return containsInAnyOrder(Arrays.asList(expectedElements));
     }
-  }
 
-  /**
-   * An assertion about the single value of type {@code T}
-   * associated with a {@link PCollectionView}.
-   */
-  public static class SingletonAssert<T> implements Serializable {
-    private final Pipeline pipeline;
-    private final CreateActual<?, T> createActual;
-    private Optional<Coder<T>> coder;
-
-    protected SingletonAssert(
-        CreateActual<?, T> createActual, Pipeline pipeline) {
-      this.pipeline = pipeline;
-      this.createActual = createActual;
-      this.coder = Optional.absent();
+    @Override
+    public PCollectionSingletonIterableAssert<T> empty() {
+      return containsInAnyOrder(Collections.<T>emptyList());
     }
 
-    /**
-     * Always throws an {@link UnsupportedOperationException}: users are probably looking for
-     * {@link #isEqualTo}.
-     */
-    @Deprecated
     @Override
-    public boolean equals(Object o) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "tests for Java equality of the %s object, not the PCollection in question. "
-                  + "Call a test method, such as isEqualTo.",
-              getClass().getSimpleName()));
+    public PCollectionSingletonIterableAssert<T> containsInAnyOrder(Iterable<T> expectedElements) {
+      return satisfies(new AssertContainsInAnyOrderRelation<T>(), expectedElements);
     }
 
-    /**
-     * @throws UnsupportedOperationException always.
-     * @deprecated {@link Object#hashCode()} is not supported on PAssert objects.
-     */
-    @Deprecated
     @Override
-    public int hashCode() {
-      throw new UnsupportedOperationException(
-          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
+    public PCollectionSingletonIterableAssert<T> satisfies(
+        SerializableFunction<Iterable<T>, Void> checkerFn) {
+      actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn));
+      return this;
     }
 
-    /**
-     * Sets the coder to use for elements of type {@code T}, as needed
-     * for internal purposes.
-     *
-     * <p>Returns this {@code IterableAssert}.
-     */
-    public SingletonAssert<T> setCoder(Coder<T> coderOrNull) {
-      this.coder = Optional.fromNullable(coderOrNull);
-      return this;
+    private PCollectionSingletonIterableAssert<T> satisfies(
+        AssertRelation<Iterable<T>, Iterable<T>> relation, Iterable<T> expectedElements) {
+      return satisfies(
+          new CheckRelationAgainstExpected<Iterable<T>>(
+              relation, expectedElements, IterableCoder.of(elementCoder)));
     }
+  }
 
-    /**
-     * Gets the coder, which may yet be absent.
-     */
-    public Coder<T> getCoder() {
-      if (coder.isPresent()) {
-        return coder.get();
-      } else {
-        throw new IllegalStateException(
-            "Attempting to access the coder of an IterableAssert that has not been set yet.");
-      }
+  /**
+   * An assertion about the contents of a {@link PCollection} when it is viewed as a single value
+   * of type {@code ViewT}. This requires side input support from the runner.
+   */
+  private static class PCollectionViewAssert<ElemT, ViewT> implements SingletonAssert<ViewT> {
+    private final PCollection<ElemT> actual;
+    private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
+    private final Coder<ViewT> coder;
+
+    protected PCollectionViewAssert(
+        PCollection<ElemT> actual,
+        PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
+        Coder<ViewT> coder) {
+      this.actual = actual;
+      this.view = view;
+      this.coder = coder;
     }
 
-    /**
-     * Applies a {@link SerializableFunction} to check the value of this
-     * {@code SingletonAssert}'s view.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> satisfies(SerializableFunction<T, Void> checkerFn) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++), new OneSideInputAssert<T>(createActual, checkerFn));
-      return this;
+    @Override
+    public PCollectionViewAssert<ElemT, ViewT> isEqualTo(ViewT expectedValue) {
+      return satisfies(new AssertIsEqualToRelation<ViewT>(), expectedValue);
     }
 
-    /**
-     * Applies an {@link AssertRelation} to check the provided relation against the
-     * value of this assert and the provided expected value.
-     *
-     * <p>Returns this {@code SingletonAssert}.
-     */
-    public SingletonAssert<T> satisfies(
-        AssertRelation<T, T> relation,
-        final T expectedValue) {
-      pipeline.apply(
-          "PAssert$" + (assertCount++),
-          new TwoSideInputAssert<T, T>(
-              createActual,
-              new CreateExpected<T, T>(Arrays.asList(expectedValue), coder, View.<T>asSingleton()),
-              relation));
+    @Override
+    public PCollectionViewAssert<ElemT, ViewT> notEqualTo(ViewT expectedValue) {
+      return satisfies(new AssertNotEqualToRelation<ViewT>(), expectedValue);
+    }
 
+    @Override
+    public PCollectionViewAssert<ElemT, ViewT> satisfies(
+        SerializableFunction<ViewT, Void> checkerFn) {
+      actual
+          .getPipeline()
+          .apply(
+              "PAssert$" + (assertCount++),
+              new OneSideInputAssert<ViewT>(CreateActual.from(actual, view), checkerFn));
       return this;
     }
 
     /**
-     * Checks that the value of this {@code SingletonAssert}'s view is equal
-     * to the expected value.
+     * Applies an {@link AssertRelation} to check the provided relation against the value of this
+     * assert and the provided expected value.
      *
      * <p>Returns this {@code SingletonAssert}.
      */
-    public SingletonAssert<T> isEqualTo(T expectedValue) {
-      return satisfies(new AssertIsEqualToRelation<T>(), expectedValue);
+    private PCollectionViewAssert<ElemT, ViewT> satisfies(
+        AssertRelation<ViewT, ViewT> relation, final ViewT expectedValue) {
+      return satisfies(new CheckRelationAgainstExpected<ViewT>(relation, expectedValue, coder));
     }
 
     /**
-     * Checks that the value of this {@code SingletonAssert}'s view is not equal
-     * to the expected value.
-     *
-     * <p>Returns this {@code SingletonAssert}.
+     * Always throws an {@link UnsupportedOperationException}: users are probably looking for
+     * {@link #isEqualTo}.
      */
-    public SingletonAssert<T> notEqualTo(T expectedValue) {
-      return satisfies(new AssertNotEqualToRelation<T>(), expectedValue);
+    @Deprecated
+    @Override
+    public boolean equals(Object o) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "tests for Java equality of the %s object, not the PCollection in question. "
+                  + "Call a test method, such as isEqualTo.",
+              getClass().getSimpleName()));
     }
 
     /**
-     * Checks that the value of this {@code SingletonAssert}'s view is equal to
-     * the expected value.
-     *
-     * @deprecated replaced by {@link #isEqualTo}
+     * @throws UnsupportedOperationException always.
+     * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects.
      */
     @Deprecated
-    public SingletonAssert<T> is(T expectedValue) {
-      return isEqualTo(expectedValue);
+    @Override
+    public int hashCode() {
+      throw new UnsupportedOperationException(
+          String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName()));
     }
-
   }
 
   ////////////////////////////////////////////////////////////////////////
@@ -504,8 +498,13 @@ public class PAssert {
     private final transient PCollection<T> actual;
     private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView;
 
-    private CreateActual(PCollection<T> actual,
-        PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+    public static <T, ActualT> CreateActual<T, ActualT> from(
+        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
+      return new CreateActual<>(actual, actualView);
+    }
+
+    private CreateActual(
+        PCollection<T> actual, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
       this.actual = actual;
       this.actualView = actualView;
     }
@@ -515,73 +514,198 @@ public class PAssert {
       final Coder<T> coder = actual.getCoder();
       return actual
           .apply(Window.<T>into(new GlobalWindows()))
-          .apply(ParDo.of(new DoFn<T, T>() {
-            @Override
-            public void processElement(ProcessContext context) throws CoderException {
-              context.output(CoderUtils.clone(coder, context.element()));
-            }
-          }))
+          .apply(
+              ParDo.of(
+                  new DoFn<T, T>() {
+                    @Override
+                    public void processElement(ProcessContext context) throws CoderException {
+                      context.output(CoderUtils.clone(coder, context.element()));
+                    }
+                  }))
           .apply(actualView);
     }
   }
 
-  private static class CreateExpected<T, ExpectedT>
-      extends PTransform<PBegin, PCollectionView<ExpectedT>> {
-
-    private final Iterable<T> elements;
-    private final Optional<Coder<T>> coder;
-    private final transient PTransform<PCollection<T>, PCollectionView<ExpectedT>> view;
+  /**
+   * A partially applied {@link AssertRelation}, where one value is provided along with a coder to
+   * serialize/deserialize them.
+   */
+  private static class CheckRelationAgainstExpected<T> implements SerializableFunction<T, Void> {
+    private final AssertRelation<T, T> relation;
+    private final byte[] encodedExpected;
+    private final Coder<T> coder;
 
-    private CreateExpected(Iterable<T> elements, Optional<Coder<T>> coder,
-        PTransform<PCollection<T>, PCollectionView<ExpectedT>> view) {
-      this.elements = elements;
+    public CheckRelationAgainstExpected(AssertRelation<T, T> relation, T expected, Coder<T> coder) {
+      this.relation = relation;
       this.coder = coder;
-      this.view = view;
+
+      try {
+        this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected);
+      } catch (IOException coderException) {
+        throw new RuntimeException(coderException);
+      }
     }
 
     @Override
-    public PCollectionView<ExpectedT> apply(PBegin input) {
-      Create.Values<T> createTransform = Create.<T>of(elements);
-      if (coder.isPresent()) {
-        createTransform = createTransform.withCoder(coder.get());
+    public Void apply(T actual) {
+      try {
+        T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected);
+        return relation.assertFor(expected).apply(actual);
+      } catch (IOException coderException) {
+        throw new RuntimeException(coderException);
       }
-      return input.apply(createTransform).apply(view);
     }
   }
 
-  private static class PreExisting<T> extends PTransform<PBegin, PCollectionView<T>> {
+  /**
+   * A transform that gathers the contents of a {@link PCollection} into a single main input
+   * iterable in the global window. This requires a runner to support {@link GroupByKey} in the
+   * global window, but not side inputs or other windowing or triggers.
+   *
+   * <p>If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing
+   * a single empty iterable, even though in practice most runners will not produce any element.
+   */
+  private static class GroupGlobally<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>>
+      implements Serializable {
 
-    private final PCollectionView<T> view;
+    public GroupGlobally() {}
 
-    private PreExisting(PCollectionView<T> view) {
-      this.view = view;
+    @Override
+    public PCollection<Iterable<T>> apply(PCollection<T> input) {
+
+      final int contentsKey = 0;
+      final int dummyKey = 1;
+      final int combinedKey = 42;
+
+      // Group the contents by key. If it is empty, this PCollection will be empty, too.
+      // Then key it again with a dummy key.
+      PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedGroupedInput =
+          input
+              .apply("GloballyWindow", Window.<T>into(new GlobalWindows()))
+              .apply("ContentsWithKeys", WithKeys.<Integer, T>of(contentsKey))
+              .apply(
+                  "NeverTriggerContents",
+                  Window.<KV<Integer, T>>triggering(Never.ever()).discardingFiredPanes())
+              .apply("ContentsGBK", GroupByKey.<Integer, T>create())
+              .apply(
+                  "DoubleKeyContents", WithKeys.<Integer, KV<Integer, Iterable<T>>>of(combinedKey));
+
+      // Create another non-empty PCollection that is keyed with a distinct dummy key
+      PCollection<KV<Integer, KV<Integer, Iterable<T>>>> doubleKeyedDummy =
+          input
+              .getPipeline()
+              .apply(
+                  Create.of(
+                          KV.of(
+                              combinedKey,
+                              KV.of(dummyKey, (Iterable<T>) Collections.<T>emptyList())))
+                      .withCoder(doubleKeyedGroupedInput.getCoder()))
+              .setWindowingStrategyInternal(doubleKeyedGroupedInput.getWindowingStrategy());
+
+      // Flatten them together and group by the combined key to get a single element
+      PCollection<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>> dummyAndContents =
+          PCollectionList.<KV<Integer, KV<Integer, Iterable<T>>>>of(doubleKeyedGroupedInput)
+              .and(doubleKeyedDummy)
+              .apply(
+                  "FlattenDummyAndContents",
+                  Flatten.<KV<Integer, KV<Integer, Iterable<T>>>>pCollections())
+              .apply(
+                  "GroupDummyAndContents", GroupByKey.<Integer, KV<Integer, Iterable<T>>>create());
+
+      // Extract the contents if they exist else empty contents.
+      return dummyAndContents
+          .apply(
+              "GetContents",
+              ParDo.of(
+                  new DoFn<KV<Integer, Iterable<KV<Integer, Iterable<T>>>>, Iterable<T>>() {
+                    @Override
+                    public void processElement(ProcessContext ctx) {
+                      Iterable<KV<Integer, Iterable<T>>> groupedDummyAndContents =
+                          ctx.element().getValue();
+
+                      if (Iterables.size(groupedDummyAndContents) == 1) {
+                        // Only the dummy value, so just output empty
+                        ctx.output(Collections.<T>emptyList());
+                      } else {
+                        checkState(
+                            Iterables.size(groupedDummyAndContents) == 2,
+                            "Internal error: PAssert grouped contents with a"
+                                + " dummy value resulted in more than 2 groupings: %s",
+                                groupedDummyAndContents);
+
+                        if (Iterables.get(groupedDummyAndContents, 0).getKey() == contentsKey) {
+                          // The first iterable in the group holds the real contents
+                          ctx.output(Iterables.get(groupedDummyAndContents, 0).getValue());
+                        } else {
+                          // The second iterable holds the real contents
+                          ctx.output(Iterables.get(groupedDummyAndContents, 1).getValue());
+                        }
+                      }
+                    }
+                  }));
+    }
+  }
+
+  /**
+   * A transform that applies an assertion-checking function over iterables of {@code ActualT} to
+   * the entirety of the contents of its input.
+   */
+  public static class GroupThenAssert<T> extends PTransform<PCollection<T>, PDone>
+      implements Serializable {
+    private final SerializableFunction<Iterable<T>, Void> checkerFn;
+
+    private GroupThenAssert(SerializableFunction<Iterable<T>, Void> checkerFn) {
+      this.checkerFn = checkerFn;
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      input
+          .apply("GroupGlobally", new GroupGlobally<T>())
+          .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn)));
+
+      return PDone.in(input.getPipeline());
+    }
+  }
+
+  /**
+   * A transform that applies an assertion-checking function to a single iterable contained as the
+   * sole element of a {@link PCollection}.
+   */
+  public static class GroupThenAssertForSingleton<T>
+      extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable {
+    private final SerializableFunction<Iterable<T>, Void> checkerFn;
+
+    private GroupThenAssertForSingleton(SerializableFunction<Iterable<T>, Void> checkerFn) {
+      this.checkerFn = checkerFn;
     }
 
     @Override
-    public PCollectionView<T> apply(PBegin input) {
-      return view;
+    public PDone apply(PCollection<Iterable<T>> input) {
+      input
+          .apply("GroupGlobally", new GroupGlobally<Iterable<T>>())
+          .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn)));
+
+      return PDone.in(input.getPipeline());
     }
   }
 
   /**
-   * An assertion checker that takes a single
-   * {@link PCollectionView PCollectionView&lt;ActualT&gt;}
-   * and an assertion over {@code ActualT}, and checks it within a dataflow
-   * pipeline.
+   * An assertion checker that takes a single {@link PCollectionView
+   * PCollectionView&lt;ActualT&gt;} and an assertion over {@code ActualT}, and checks it within a
+   * Beam pipeline.
    *
-   * <p>Note that the entire assertion must be serializable. If
-   * you need to make assertions involving multiple inputs
-   * that are each not serializable, use TwoSideInputAssert.
+   * <p>Note that the entire assertion must be serializable.
    *
-   * <p>This is generally useful for assertion functions that
-   * are serializable but whose underlying data may not have a coder.
+   * <p>This is generally useful for assertion functions that are serializable but whose underlying
+   * data may not have a coder.
    */
-  public static class OneSideInputAssert<ActualT>
-      extends PTransform<PBegin, PDone> implements Serializable {
+  public static class OneSideInputAssert<ActualT> extends PTransform<PBegin, PDone>
+      implements Serializable {
     private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
     private final SerializableFunction<ActualT, Void> checkerFn;
 
-    public OneSideInputAssert(
+    private OneSideInputAssert(
         PTransform<PBegin, PCollectionView<ActualT>> createActual,
         SerializableFunction<ActualT, Void> checkerFn) {
       this.createActual = createActual;
@@ -594,21 +718,23 @@ public class PAssert {
 
       input
           .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply(ParDo.named("RunChecks").withSideInputs(actual)
-              .of(new CheckerDoFn<>(checkerFn, actual)));
+          .apply(
+              ParDo.named("RunChecks")
+                  .withSideInputs(actual)
+                  .of(new SideInputCheckerDoFn<>(checkerFn, actual)));
 
       return PDone.in(input.getPipeline());
     }
   }
 
   /**
-   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
-   * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a
+   * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing.
    *
    * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support
    * null values.
    */
-  private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> {
+  private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> {
     private final SerializableFunction<ActualT, Void> checkerFn;
     private final Aggregator<Integer, Integer> success =
         createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
@@ -616,9 +742,8 @@ public class PAssert {
         createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
     private final PCollectionView<ActualT> actual;
 
-    private CheckerDoFn(
-        SerializableFunction<ActualT, Void> checkerFn,
-        PCollectionView<ActualT> actual) {
+    private SideInputCheckerDoFn(
+        SerializableFunction<ActualT, Void> checkerFn, PCollectionView<ActualT> actual) {
       this.checkerFn = checkerFn;
       this.actual = actual;
     }
@@ -627,12 +752,9 @@ public class PAssert {
     public void processElement(ProcessContext c) {
       try {
         ActualT actualContents = c.sideInput(actual);
-        checkerFn.apply(actualContents);
-        success.addValue(1);
+        doChecks(actualContents, checkerFn, success, failure);
       } catch (Throwable t) {
-        LOG.error("PAssert failed expectations.", t);
-        failure.addValue(1);
-        // TODO: allow for metrics to propagate on failure when running a streaming pipeline
+        // Suppress exception in streaming
         if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
           throw t;
         }
@@ -641,87 +763,89 @@ public class PAssert {
   }
 
   /**
-   * An assertion checker that takes a {@link PCollectionView PCollectionView&lt;ActualT&gt;},
-   * a {@link PCollectionView PCollectionView&lt;ExpectedT&gt;}, a relation
-   * over {@code A} and {@code B}, and checks that the relation holds
-   * within a dataflow pipeline.
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * the single iterable element of the input {@link PCollection} and adjusts counters and
+   * thrown exceptions for use in testing.
    *
-   * <p>This is useful when either/both of {@code A} and {@code B}
-   * are not serializable, but have coders (provided
-   * by the underlying {@link PCollection}s).
+   * <p>The singleton property is presumed, not enforced.
    */
-  public static class TwoSideInputAssert<ActualT, ExpectedT>
-      extends PTransform<PBegin, PDone> implements Serializable {
-
-    private final transient PTransform<PBegin, PCollectionView<ActualT>> createActual;
-    private final transient PTransform<PBegin, PCollectionView<ExpectedT>> createExpected;
-    private final AssertRelation<ActualT, ExpectedT> relation;
+  private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> {
+    private final SerializableFunction<ActualT, Void> checkerFn;
+    private final Aggregator<Integer, Integer> success =
+        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+    private final Aggregator<Integer, Integer> failure =
+        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
 
-    protected TwoSideInputAssert(
-        PTransform<PBegin, PCollectionView<ActualT>> createActual,
-        PTransform<PBegin, PCollectionView<ExpectedT>> createExpected,
-        AssertRelation<ActualT, ExpectedT> relation) {
-      this.createActual = createActual;
-      this.createExpected = createExpected;
-      this.relation = relation;
+    private GroupedValuesCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+      this.checkerFn = checkerFn;
     }
 
     @Override
-    public PDone apply(PBegin input) {
-      final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual);
-      final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected);
+    public void processElement(ProcessContext c) {
+      try {
+        doChecks(c.element(), checkerFn, success, failure);
+      } catch (Throwable t) {
+        // Suppress exception in streaming
+        if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+          throw t;
+        }
+      }
+    }
+  }
 
-      input
-          .apply(Create.of(0).withCoder(VarIntCoder.of()))
-          .apply("RunChecks", ParDo.withSideInputs(actual, expected)
-              .of(new CheckerDoFn<>(relation, actual, expected)));
+  /**
+   * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of
+   * the single item contained within the single iterable on input and
+   * adjusts counters and thrown exceptions for use in testing.
+   *
+   * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However,
+   * each input element must be a singleton iterable, or this will fail.
+   */
+  private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> {
+    private final SerializableFunction<ActualT, Void> checkerFn;
+    private final Aggregator<Integer, Integer> success =
+        createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
+    private final Aggregator<Integer, Integer> failure =
+        createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
 
-      return PDone.in(input.getPipeline());
+    private SingletonCheckerDoFn(SerializableFunction<ActualT, Void> checkerFn) {
+      this.checkerFn = checkerFn;
     }
 
-    /**
-     * Input is ignored, but is {@link Integer} for runners that do not support null values.
-     */
-    private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void> {
-      private final Aggregator<Integer, Integer> success =
-          createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn());
-      private final Aggregator<Integer, Integer> failure =
-          createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn());
-      private final AssertRelation<ActualT, ExpectedT> relation;
-      private final PCollectionView<ActualT> actual;
-      private final PCollectionView<ExpectedT> expected;
-
-      private CheckerDoFn(AssertRelation<ActualT, ExpectedT> relation,
-          PCollectionView<ActualT> actual, PCollectionView<ExpectedT> expected) {
-        this.relation = relation;
-        this.actual = actual;
-        this.expected = expected;
-      }
-
-      @Override
-      public void processElement(ProcessContext c) {
-        try {
-          ActualT actualContents = c.sideInput(actual);
-          ExpectedT expectedContents = c.sideInput(expected);
-          relation.assertFor(expectedContents).apply(actualContents);
-          success.addValue(1);
-        } catch (Throwable t) {
-          LOG.error("PAssert failed expectations.", t);
-          failure.addValue(1);
-          // TODO: allow for metrics to propagate on failure when running a streaming pipeline
-          if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
-            throw t;
-          }
+    @Override
+    public void processElement(ProcessContext c) {
+      try {
+        ActualT actualContents = Iterables.getOnlyElement(c.element());
+        doChecks(actualContents, checkerFn, success, failure);
+      } catch (Throwable t) {
+        // Suppress exception in streaming
+        if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) {
+          throw t;
         }
       }
     }
   }
 
+  private static <ActualT> void doChecks(
+      ActualT actualContents,
+      SerializableFunction<ActualT, Void> checkerFn,
+      Aggregator<Integer, Integer> successAggregator,
+      Aggregator<Integer, Integer> failureAggregator) {
+    try {
+      checkerFn.apply(actualContents);
+      successAggregator.addValue(1);
+    } catch (Throwable t) {
+      LOG.error("PAssert failed expectations.", t);
+      failureAggregator.addValue(1);
+      throw t;
+    }
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * A {@link SerializableFunction} that verifies that an actual value is equal to an
-   * expected value.
+   * A {@link SerializableFunction} that verifies that an actual value is equal to an expected
+   * value.
    */
   private static class AssertIsEqualTo<T> implements SerializableFunction<T, Void> {
     private T expected;
@@ -738,8 +862,8 @@ public class PAssert {
   }
 
   /**
-   * A {@link SerializableFunction} that verifies that an actual value is not equal to an
-   * expected value.
+   * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected
+   * value.
    */
   private static class AssertNotEqualTo<T> implements SerializableFunction<T, Void> {
     private T expected;
@@ -756,8 +880,8 @@ public class PAssert {
   }
 
   /**
-   * A {@link SerializableFunction} that verifies that an {@code Iterable} contains
-   * expected items in any order.
+   * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items
+   * in any order.
    */
   private static class AssertContainsInAnyOrder<T>
       implements SerializableFunction<Iterable<T>, Void> {
@@ -787,10 +911,9 @@ public class PAssert {
   ////////////////////////////////////////////////////////////
 
   /**
-   * A binary predicate between types {@code Actual} and {@code Expected}.
-   * Implemented as a method {@code assertFor(Expected)} which returns
-   * a {@code SerializableFunction<Actual, Void>}
-   * that should verify the assertion..
+   * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method
+   * {@code assertFor(Expected)} which returns a {@code SerializableFunction<Actual, Void>} that
+   * should verify the assertion..
    */
   private static interface AssertRelation<ActualT, ExpectedT> extends Serializable {
     public SerializableFunction<ActualT, Void> assertFor(ExpectedT input);
@@ -799,8 +922,7 @@ public class PAssert {
   /**
    * An {@link AssertRelation} implementing the binary predicate that two objects are equal.
    */
-  private static class AssertIsEqualToRelation<T>
-      implements AssertRelation<T, T> {
+  private static class AssertIsEqualToRelation<T> implements AssertRelation<T, T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
       return new AssertIsEqualTo<T>(expected);
@@ -810,8 +932,7 @@ public class PAssert {
   /**
    * An {@link AssertRelation} implementing the binary predicate that two objects are not equal.
    */
-  private static class AssertNotEqualToRelation<T>
-      implements AssertRelation<T, T> {
+  private static class AssertNotEqualToRelation<T> implements AssertRelation<T, T> {
     @Override
     public SerializableFunction<T, Void> assertFor(T expected) {
       return new AssertNotEqualTo<T>(expected);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba9ddf2/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index f540948..fdc8719 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.sdk.testing;
 
-import static org.apache.beam.sdk.testing.SerializableMatchers.anything;
-import static org.apache.beam.sdk.testing.SerializableMatchers.not;
-
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -151,30 +148,6 @@ public class PAssertTest implements Serializable {
   }
 
   /**
-   * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}.
-   */
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBasicMatcherSuccess() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
-    PAssert.that(pcollection).containsInAnyOrder(anything());
-    pipeline.run();
-  }
-
-  /**
-   * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}.
-   */
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBasicMatcherFailure() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-    PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
-    PAssert.that(pcollection).containsInAnyOrder(not(anything()));
-    runExpectingAssertionFailure(pipeline);
-  }
-
-  /**
    * Test that we throw an error at pipeline construction time when the user mistakenly uses
    * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}.
    */


Mime
View raw message