Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BC59A200ACA for ; Thu, 9 Jun 2016 23:58:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BAEDA160A58; Thu, 9 Jun 2016 21:58:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1C9F2160A29 for ; Thu, 9 Jun 2016 23:58:30 +0200 (CEST) Received: (qmail 65419 invoked by uid 500); 9 Jun 2016 21:58:30 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 65410 invoked by uid 99); 9 Jun 2016 21:58:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 21:58:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id B97A81A0692 for ; Thu, 9 Jun 2016 21:58:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ytysDK7VNEbZ for ; Thu, 9 Jun 2016 21:58:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 251515F4E8 for ; Thu, 9 Jun 2016 21:58:19 +0000 (UTC) Received: (qmail 60900 invoked by uid 99); 9 Jun 2016 21:58:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 21:58:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 38D8EE08BD; Thu, 9 Jun 2016 21:58:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Thu, 09 Jun 2016 21:58:22 -0000 Message-Id: <695d1871a3234806a553f696da119180@git.apache.org> In-Reply-To: <0cfa6987671842f1ae984ce66222c2ba@git.apache.org> References: <0cfa6987671842f1ae984ce66222c2ba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] incubator-beam git commit: Base PAssert on GBK instead of side inputs archived-at: Thu, 09 Jun 2016 21:58:32 -0000 Base PAssert on GBK instead of side inputs Previously PAssert - hence all RunnableOnService/NeedsRunner tests - required side input support. This created a very steep on ramp for new runners. GroupByKey is a bit more fundamental and most backends will be able to group by key in the global window very quickly. So switching the primitive used to gather all the contents of a PCollection for assertions should make it a bit easier to get early feedback during runner development. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/810ffeb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/810ffeb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/810ffeb2 Branch: refs/heads/master Commit: 810ffeb2785bf996001c8fadb992410d1f9409c6 Parents: d6adbbf Author: Kenneth Knowles Authored: Wed Jun 8 15:07:52 2016 -0700 Committer: Kenneth Knowles Committed: Thu Jun 9 14:41:09 2016 -0700 ---------------------------------------------------------------------- .../testing/TestDataflowPipelineRunner.java | 3 +- .../org/apache/beam/sdk/testing/PAssert.java | 737 +++++++++---------- .../apache/beam/sdk/testing/PAssertTest.java | 27 - 3 files changed, 362 insertions(+), 405 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java index 3e8d903..c940e9a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunner.java @@ -166,7 +166,8 @@ public class TestDataflowPipelineRunner extends PipelineRunner OutputT apply( PTransform transform, InputT input) { if (transform instanceof PAssert.OneSideInputAssert - || transform instanceof PAssert.TwoSideInputAssert) { + || transform instanceof PAssert.GroupThenAssert + || transform instanceof PAssert.GroupThenAssertForSingleton) { expectedNumberOfAssertions += 1; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index c2cd598..b10c1cb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -34,11 +34,14 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; @@ -48,32 +51,27 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import com.google.common.base.Optional; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; /** - * An assertion on the contents of a {@link PCollection} - * incorporated into the pipeline. Such an assertion - * can be checked no matter what kind of {@link PipelineRunner} is - * used. + * An assertion on the contents of a {@link PCollection} incorporated into the pipeline. Such an + * assertion can be checked no matter what kind of {@link PipelineRunner} is used. * - *

Note that the {@code PAssert} call must precede the call - * to {@link Pipeline#run}. + *

Note that the {@code PAssert} call must precede the call to {@link Pipeline#run}. * - *

Examples of use: - *

{@code
+ * 

Examples of use:

{@code
  * Pipeline p = TestPipeline.create();
  * ...
  * PCollection output =
@@ -107,30 +105,84 @@ public class PAssert {
   private PAssert() {}
 
   /**
-   * Constructs an {@link IterableAssert} for the elements of the provided
-   * {@link PCollection}.
+   * Builder interface for assertions applicable to iterables and PCollection contents.
+   */
+  public interface IterableAssert {
+
+    /**
+     * Asserts that the iterable in question contains the provided elements.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert containsInAnyOrder(T... expectedElements);
+
+    /**
+     * Asserts that the iterable in question contains the provided elements.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert containsInAnyOrder(Iterable expectedElements);
+
+    /**
+     * Asserts that the iterable in question is empty.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert empty();
+
+    /**
+     * Applies the provided checking function (presumably containing assertions) to the
+     * iterable in question.
+     *
+     * @return the same {@link IterableAssert} builder for further assertions
+     */
+    IterableAssert satisfies(SerializableFunction, Void> checkerFn);
+  }
+
+  /**
+   * Builder interface for assertions applicable to a single value.
+   */
+  public interface SingletonAssert {
+    /**
+     * Asserts that the value in question is equal to the provided value, according to
+     * {@link Object#equals}.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert isEqualTo(T expected);
+
+    /**
+     * Asserts that the value in question is not equal to the provided value, according
+     * to {@link Object#equals}.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert notEqualTo(T notExpected);
+
+    /**
+     * Applies the provided checking function (presumably containing assertions) to the
+     * value in question.
+     *
+     * @return the same {@link SingletonAssert} builder for further assertions
+     */
+    SingletonAssert satisfies(SerializableFunction checkerFn);
+  }
+
+  /**
+   * Constructs an {@link IterableAssert} for the elements of the provided {@link PCollection}.
    */
   public static  IterableAssert that(PCollection actual) {
-    return new IterableAssert<>(
-        new CreateActual>(actual, View.asIterable()),
-         actual.getPipeline())
-         .setCoder(actual.getCoder());
+    return new PCollectionContentsAssert<>(actual);
   }
 
   /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@link PCollection} which must contain a single {@code Iterable}
-   * value.
+   * Constructs an {@link IterableAssert} for the value of the provided {@link PCollection} which
+   * must contain a single {@code Iterable} value.
    */
-  public static  IterableAssert
-      thatSingletonIterable(PCollection> actual) {
+  public static  IterableAssert thatSingletonIterable(
+      PCollection> actual) {
 
-    List> maybeElementCoder = actual.getCoder().getCoderArguments();
-    Coder tCoder;
     try {
-      @SuppressWarnings("unchecked")
-      Coder tCoderTmp = (Coder) Iterables.getOnlyElement(maybeElementCoder);
-      tCoder = tCoderTmp;
     } catch (NoSuchElementException | IllegalArgumentException exc) {
       throw new IllegalArgumentException(
           "PAssert.thatSingletonIterable requires a PCollection>"
@@ -141,19 +193,7 @@ public class PAssert {
     @SuppressWarnings("unchecked") // Safe covariant cast
     PCollection> actualIterables = (PCollection>) actual;
 
-    return new IterableAssert<>(
-        new CreateActual, Iterable>(
-            actualIterables, View.>asSingleton()),
-        actual.getPipeline())
-        .setCoder(tCoder);
-  }
-
-  /**
-   * Constructs an {@link IterableAssert} for the value of the provided
-   * {@code PCollectionView PCollectionView>}.
-   */
-  public static  IterableAssert thatIterable(PCollectionView> actual) {
-    return new IterableAssert<>(new PreExisting>(actual), actual.getPipeline());
+    return new PCollectionSingletonIterableAssert<>(actualIterables);
   }
 
   /**
@@ -161,93 +201,95 @@ public class PAssert {
    * {@code PCollection PCollection}, which must be a singleton.
    */
   public static  SingletonAssert thatSingleton(PCollection actual) {
-    return new SingletonAssert<>(
-        new CreateActual(actual, View.asSingleton()), actual.getPipeline())
-        .setCoder(actual.getCoder());
+    return new PCollectionViewAssert<>(actual, View.asSingleton(), actual.getCoder());
   }
 
   /**
    * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}.
    *
-   * 

Note that the actual value must be coded by a {@link KvCoder}, - * not just any {@code Coder}. + *

Note that the actual value must be coded by a {@link KvCoder}, not just any + * {@code Coder}. */ - public static SingletonAssert>> - thatMultimap(PCollection> actual) { + public static SingletonAssert>> thatMultimap( + PCollection> actual) { @SuppressWarnings("unchecked") KvCoder kvCoder = (KvCoder) actual.getCoder(); - - return new SingletonAssert<>( - new CreateActual<>(actual, View.asMultimap()), actual.getPipeline()) - .setCoder(MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); + return new PCollectionViewAssert<>( + actual, + View.asMultimap(), + MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder()))); } /** - * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, - * which must have at most one value per key. + * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection}, which + * must have at most one value per key. * - *

Note that the actual value must be coded by a {@link KvCoder}, - * not just any {@code Coder}. + *

Note that the actual value must be coded by a {@link KvCoder}, not just any + * {@code Coder}. */ public static SingletonAssert> thatMap(PCollection> actual) { @SuppressWarnings("unchecked") KvCoder kvCoder = (KvCoder) actual.getCoder(); - - return new SingletonAssert<>( - new CreateActual<>(actual, View.asMap()), actual.getPipeline()) - .setCoder(MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); + return new PCollectionViewAssert<>( + actual, View.asMap(), MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder())); } //////////////////////////////////////////////////////////// /** - * An assertion about the contents of a {@link PCollectionView} yielding an {@code Iterable}. + * An {@link IterableAssert} about the contents of a {@link PCollection}. This does not require + * the runner to support side inputs. */ - public static class IterableAssert implements Serializable { - private final Pipeline pipeline; - private final PTransform>> createActual; - private Optional> coder; + private static class PCollectionContentsAssert implements IterableAssert { + private final PCollection actual; - protected IterableAssert( - PTransform>> createActual, Pipeline pipeline) { - this.createActual = createActual; - this.pipeline = pipeline; - this.coder = Optional.absent(); + public PCollectionContentsAssert(PCollection actual) { + this.actual = actual; } /** - * Sets the coder to use for elements of type {@code T}, as needed for internal purposes. + * Checks that the {@code Iterable} contains the expected elements, in any order. * *

Returns this {@code IterableAssert}. */ - public IterableAssert setCoder(Coder coderOrNull) { - this.coder = Optional.fromNullable(coderOrNull); - return this; + @Override + @SafeVarargs + public final PCollectionContentsAssert containsInAnyOrder(T... expectedElements) { + return containsInAnyOrder(Arrays.asList(expectedElements)); } /** - * Gets the coder, which may yet be absent. + * Checks that the {@code Iterable} contains the expected elements, in any order. + * + *

Returns this {@code IterableAssert}. */ - public Coder getCoder() { - if (coder.isPresent()) { - return coder.get(); - } else { - throw new IllegalStateException( - "Attempting to access the coder of an IterableAssert" - + " that has not been set yet."); - } + @Override + public PCollectionContentsAssert containsInAnyOrder(Iterable expectedElements) { + return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); + } + + @Override + public PCollectionContentsAssert empty() { + return containsInAnyOrder(Collections.emptyList()); + } + + @Override + public PCollectionContentsAssert satisfies( + SerializableFunction, Void> checkerFn) { + actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); + return this; } /** - * Applies a {@link SerializableFunction} to check the elements of the {@code Iterable}. + * Checks that the {@code Iterable} contains elements that match the provided matchers, in any + * order. * *

Returns this {@code IterableAssert}. */ - public IterableAssert satisfies(SerializableFunction, Void> checkerFn) { - pipeline.apply( - "PAssert$" + (assertCount++), - new OneSideInputAssert>(createActual, checkerFn)); - return this; + @SafeVarargs + final PCollectionContentsAssert containsInAnyOrder( + SerializableMatcher... elementMatchers) { + return satisfies(SerializableMatchers.containsInAnyOrder(elementMatchers)); } /** @@ -255,17 +297,11 @@ public class PAssert { * *

Returns this {@code IterableAssert}. */ - public IterableAssert satisfies( - AssertRelation, Iterable> relation, - final Iterable expectedElements) { - pipeline.apply( - "PAssert$" + (assertCount++), - new TwoSideInputAssert, Iterable>( - createActual, - new CreateExpected>(expectedElements, coder, View.asIterable()), - relation)); - - return this; + private PCollectionContentsAssert satisfies( + AssertRelation, Iterable> relation, Iterable expectedElements) { + return satisfies( + new CheckRelationAgainstExpected>( + relation, expectedElements, IterableCoder.of(actual.getCoder()))); } /** @@ -273,15 +309,14 @@ public class PAssert { * *

Returns this {@code IterableAssert}. */ - IterableAssert satisfies(final SerializableMatcher> matcher) { + PCollectionContentsAssert satisfies( + final SerializableMatcher> matcher) { // Safe covariant cast. Could be elided by changing a lot of this file to use // more flexible bounds. @SuppressWarnings({"rawtypes", "unchecked"}) SerializableFunction, Void> checkerFn = - (SerializableFunction) new MatcherCheckerFn<>(matcher); - pipeline.apply( - "PAssert$" + (assertCount++), - new OneSideInputAssert>(createActual, checkerFn)); + (SerializableFunction) new MatcherCheckerFn<>(matcher); + actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); return this; } @@ -300,19 +335,9 @@ public class PAssert { } /** - * Checks that the {@code Iterable} is empty. - * - *

Returns this {@code IterableAssert}. - */ - public IterableAssert empty() { - return satisfies(new AssertContainsInAnyOrderRelation(), Collections.emptyList()); - } - - /** * @throws UnsupportedOperationException always - * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. - * If you meant to test object equality, use a variant of {@link #containsInAnyOrder} - * instead. + * @deprecated {@link Object#equals(Object)} is not supported on PAssert objects. If you meant + * to test object equality, use a variant of {@link #containsInAnyOrder} instead. */ @Deprecated @Override @@ -331,169 +356,129 @@ public class PAssert { throw new UnsupportedOperationException( String.format("%s.hashCode() is not supported.", IterableAssert.class.getSimpleName())); } + } - /** - * Checks that the {@code Iterable} contains the expected elements, in any - * order. - * - *

Returns this {@code IterableAssert}. - */ - public IterableAssert containsInAnyOrder(Iterable expectedElements) { - return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); - } + /** + * An {@link IterableAssert} for an iterable that is the sole element of a {@link PCollection}. + * This does not require the runner to support side inputs. + */ + private static class PCollectionSingletonIterableAssert implements IterableAssert { + private final PCollection> actual; + private final Coder elementCoder; - /** - * Checks that the {@code Iterable} contains the expected elements, in any - * order. - * - *

Returns this {@code IterableAssert}. - */ - @SafeVarargs - public final IterableAssert containsInAnyOrder(T... expectedElements) { - return satisfies( - new AssertContainsInAnyOrderRelation(), - Arrays.asList(expectedElements)); + public PCollectionSingletonIterableAssert(PCollection> actual) { + this.actual = actual; + + @SuppressWarnings("unchecked") + Coder typedCoder = (Coder) actual.getCoder().getCoderArguments().get(0); + this.elementCoder = typedCoder; } - /** - * Checks that the {@code Iterable} contains elements that match the provided matchers, - * in any order. - * - *

Returns this {@code IterableAssert}. - */ + @Override @SafeVarargs - final IterableAssert containsInAnyOrder( - SerializableMatcher... elementMatchers) { - return satisfies(SerializableMatchers.containsInAnyOrder(elementMatchers)); + public final PCollectionSingletonIterableAssert containsInAnyOrder(T... expectedElements) { + return containsInAnyOrder(Arrays.asList(expectedElements)); } - } - /** - * An assertion about the single value of type {@code T} - * associated with a {@link PCollectionView}. - */ - public static class SingletonAssert implements Serializable { - private final Pipeline pipeline; - private final CreateActual createActual; - private Optional> coder; - - protected SingletonAssert( - CreateActual createActual, Pipeline pipeline) { - this.pipeline = pipeline; - this.createActual = createActual; - this.coder = Optional.absent(); + @Override + public PCollectionSingletonIterableAssert empty() { + return containsInAnyOrder(Collections.emptyList()); } - /** - * Always throws an {@link UnsupportedOperationException}: users are probably looking for - * {@link #isEqualTo}. - */ - @Deprecated @Override - public boolean equals(Object o) { - throw new UnsupportedOperationException( - String.format( - "tests for Java equality of the %s object, not the PCollection in question. " - + "Call a test method, such as isEqualTo.", - getClass().getSimpleName())); + public PCollectionSingletonIterableAssert containsInAnyOrder(Iterable expectedElements) { + return satisfies(new AssertContainsInAnyOrderRelation(), expectedElements); } - /** - * @throws UnsupportedOperationException always. - * @deprecated {@link Object#hashCode()} is not supported on PAssert objects. - */ - @Deprecated @Override - public int hashCode() { - throw new UnsupportedOperationException( - String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); + public PCollectionSingletonIterableAssert satisfies( + SerializableFunction, Void> checkerFn) { + actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn)); + return this; } - /** - * Sets the coder to use for elements of type {@code T}, as needed - * for internal purposes. - * - *

Returns this {@code IterableAssert}. - */ - public SingletonAssert setCoder(Coder coderOrNull) { - this.coder = Optional.fromNullable(coderOrNull); - return this; + private PCollectionSingletonIterableAssert satisfies( + AssertRelation, Iterable> relation, Iterable expectedElements) { + return satisfies( + new CheckRelationAgainstExpected>( + relation, expectedElements, IterableCoder.of(elementCoder))); } + } - /** - * Gets the coder, which may yet be absent. - */ - public Coder getCoder() { - if (coder.isPresent()) { - return coder.get(); - } else { - throw new IllegalStateException( - "Attempting to access the coder of an IterableAssert that has not been set yet."); - } + /** + * An assertion about the contents of a {@link PCollection} when it is viewed as a single value + * of type {@code ViewT}. This requires side input support from the runner. + */ + private static class PCollectionViewAssert implements SingletonAssert { + private final PCollection actual; + private final PTransform, PCollectionView> view; + private final Coder coder; + + protected PCollectionViewAssert( + PCollection actual, + PTransform, PCollectionView> view, + Coder coder) { + this.actual = actual; + this.view = view; + this.coder = coder; } - /** - * Applies a {@link SerializableFunction} to check the value of this - * {@code SingletonAssert}'s view. - * - *

Returns this {@code SingletonAssert}. - */ - public SingletonAssert satisfies(SerializableFunction checkerFn) { - pipeline.apply( - "PAssert$" + (assertCount++), new OneSideInputAssert(createActual, checkerFn)); - return this; + @Override + public PCollectionViewAssert isEqualTo(ViewT expectedValue) { + return satisfies(new AssertIsEqualToRelation(), expectedValue); } - /** - * Applies an {@link AssertRelation} to check the provided relation against the - * value of this assert and the provided expected value. - * - *

Returns this {@code SingletonAssert}. - */ - public SingletonAssert satisfies( - AssertRelation relation, - final T expectedValue) { - pipeline.apply( - "PAssert$" + (assertCount++), - new TwoSideInputAssert( - createActual, - new CreateExpected(Arrays.asList(expectedValue), coder, View.asSingleton()), - relation)); + @Override + public PCollectionViewAssert notEqualTo(ViewT expectedValue) { + return satisfies(new AssertNotEqualToRelation(), expectedValue); + } + @Override + public PCollectionViewAssert satisfies( + SerializableFunction checkerFn) { + actual + .getPipeline() + .apply( + "PAssert$" + (assertCount++), + new OneSideInputAssert(CreateActual.from(actual, view), checkerFn)); return this; } /** - * Checks that the value of this {@code SingletonAssert}'s view is equal - * to the expected value. + * Applies an {@link AssertRelation} to check the provided relation against the value of this + * assert and the provided expected value. * *

Returns this {@code SingletonAssert}. */ - public SingletonAssert isEqualTo(T expectedValue) { - return satisfies(new AssertIsEqualToRelation(), expectedValue); + private PCollectionViewAssert satisfies( + AssertRelation relation, final ViewT expectedValue) { + return satisfies(new CheckRelationAgainstExpected(relation, expectedValue, coder)); } /** - * Checks that the value of this {@code SingletonAssert}'s view is not equal - * to the expected value. - * - *

Returns this {@code SingletonAssert}. + * Always throws an {@link UnsupportedOperationException}: users are probably looking for + * {@link #isEqualTo}. */ - public SingletonAssert notEqualTo(T expectedValue) { - return satisfies(new AssertNotEqualToRelation(), expectedValue); + @Deprecated + @Override + public boolean equals(Object o) { + throw new UnsupportedOperationException( + String.format( + "tests for Java equality of the %s object, not the PCollection in question. " + + "Call a test method, such as isEqualTo.", + getClass().getSimpleName())); } /** - * Checks that the value of this {@code SingletonAssert}'s view is equal to - * the expected value. - * - * @deprecated replaced by {@link #isEqualTo} + * @throws UnsupportedOperationException always. + * @deprecated {@link Object#hashCode()} is not supported on {@link PAssert} objects. */ @Deprecated - public SingletonAssert is(T expectedValue) { - return isEqualTo(expectedValue); + @Override + public int hashCode() { + throw new UnsupportedOperationException( + String.format("%s.hashCode() is not supported.", SingletonAssert.class.getSimpleName())); } - } //////////////////////////////////////////////////////////////////////// @@ -504,8 +489,13 @@ public class PAssert { private final transient PCollection actual; private final transient PTransform, PCollectionView> actualView; - private CreateActual(PCollection actual, - PTransform, PCollectionView> actualView) { + public static CreateActual from( + PCollection actual, PTransform, PCollectionView> actualView) { + return new CreateActual<>(actual, actualView); + } + + private CreateActual( + PCollection actual, PTransform, PCollectionView> actualView) { this.actual = actual; this.actualView = actualView; } @@ -515,73 +505,145 @@ public class PAssert { final Coder coder = actual.getCoder(); return actual .apply(Window.into(new GlobalWindows())) - .apply(ParDo.of(new DoFn() { - @Override - public void processElement(ProcessContext context) throws CoderException { - context.output(CoderUtils.clone(coder, context.element())); - } - })) + .apply( + ParDo.of( + new DoFn() { + @Override + public void processElement(ProcessContext context) throws CoderException { + context.output(CoderUtils.clone(coder, context.element())); + } + })) .apply(actualView); } } - private static class CreateExpected - extends PTransform> { - - private final Iterable elements; - private final Optional> coder; - private final transient PTransform, PCollectionView> view; + /** + * A partially applied {@link AssertRelation}, where one value is provided along with a coder to + * serialize/deserialize them. + */ + private static class CheckRelationAgainstExpected implements SerializableFunction { + private final AssertRelation relation; + private final byte[] encodedExpected; + private final Coder coder; - private CreateExpected(Iterable elements, Optional> coder, - PTransform, PCollectionView> view) { - this.elements = elements; + public CheckRelationAgainstExpected(AssertRelation relation, T expected, Coder coder) { + this.relation = relation; this.coder = coder; - this.view = view; + + try { + this.encodedExpected = CoderUtils.encodeToByteArray(coder, expected); + } catch (IOException coderException) { + throw new RuntimeException(coderException); + } } @Override - public PCollectionView apply(PBegin input) { - Create.Values createTransform = Create.of(elements); - if (coder.isPresent()) { - createTransform = createTransform.withCoder(coder.get()); + public Void apply(T actual) { + try { + T expected = CoderUtils.decodeFromByteArray(coder, encodedExpected); + return relation.assertFor(expected).apply(actual); + } catch (IOException coderException) { + throw new RuntimeException(coderException); } - return input.apply(createTransform).apply(view); } } - private static class PreExisting extends PTransform> { + /** + * A transform that gathers the contents of a {@link PCollection} into a single main input + * iterable in the global window. This requires a runner to support {@link GroupByKey} in the + * global window, but not side inputs or other windowing or triggers. + */ + private static class GroupGlobally extends PTransform, PCollection>> + implements Serializable { - private final PCollectionView view; + public GroupGlobally() {} - private PreExisting(PCollectionView view) { - this.view = view; + @Override + public PCollection> apply(PCollection input) { + return input + .apply("GloballyWindow", Window.into(new GlobalWindows())) + .apply("DummyKey", WithKeys.of(0)) + .apply("GroupByKey", GroupByKey.create()) + .apply("GetOnlyValue", Values.>create()); + } + } + + /** + * A transform that applies an assertion-checking function over iterables of {@code ActualT} to + * the entirety of the contents of its input. + */ + public static class GroupThenAssert extends PTransform, PDone> + implements Serializable { + private final SerializableFunction, Void> checkerFn; + + private GroupThenAssert(SerializableFunction, Void> checkerFn) { + this.checkerFn = checkerFn; } @Override - public PCollectionView apply(PBegin input) { - return view; + public PDone apply(PCollection input) { + input + .apply("GroupGlobally", new GroupGlobally()) + .apply( + "RunChecks", + ParDo.of( + new DoFn, Void>() { + @Override + public void processElement(ProcessContext context) { + checkerFn.apply(context.element()); + } + })); + + return PDone.in(input.getPipeline()); } } /** - * An assertion checker that takes a single - * {@link PCollectionView PCollectionView<ActualT>} - * and an assertion over {@code ActualT}, and checks it within a dataflow - * pipeline. + * A transform that applies an assertion-checking function to a single iterable contained as the + * sole element of a {@link PCollection}. + */ + public static class GroupThenAssertForSingleton + extends PTransform>, PDone> implements Serializable { + private final SerializableFunction, Void> checkerFn; + + private GroupThenAssertForSingleton(SerializableFunction, Void> checkerFn) { + this.checkerFn = checkerFn; + } + + @Override + public PDone apply(PCollection> input) { + input + .apply("GroupGlobally", new GroupGlobally>()) + .apply( + "RunChecks", + ParDo.of( + new DoFn>, Void>() { + @Override + public void processElement(ProcessContext context) { + checkerFn.apply(Iterables.getOnlyElement(context.element())); + } + })); + + return PDone.in(input.getPipeline()); + } + } + + /** + * An assertion checker that takes a single {@link PCollectionView + * PCollectionView<ActualT>} and an assertion over {@code ActualT}, and checks it within a + * Beam pipeline. * - *

Note that the entire assertion must be serializable. If - * you need to make assertions involving multiple inputs - * that are each not serializable, use TwoSideInputAssert. + *

Note that the entire assertion must be serializable. * - *

This is generally useful for assertion functions that - * are serializable but whose underlying data may not have a coder. + *

This is generally useful for assertion functions that are serializable but whose underlying + * data may not have a coder. */ - public static class OneSideInputAssert - extends PTransform implements Serializable { + public static class OneSideInputAssert extends PTransform + implements Serializable { private final transient PTransform> createActual; private final SerializableFunction checkerFn; - public OneSideInputAssert( + private OneSideInputAssert( PTransform> createActual, SerializableFunction checkerFn) { this.createActual = createActual; @@ -594,16 +656,18 @@ public class PAssert { input .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply(ParDo.named("RunChecks").withSideInputs(actual) - .of(new CheckerDoFn<>(checkerFn, actual))); + .apply( + ParDo.named("RunChecks") + .withSideInputs(actual) + .of(new CheckerDoFn<>(checkerFn, actual))); return PDone.in(input.getPipeline()); } } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of - * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. + * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a + * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. * *

The input is ignored, but is {@link Integer} to be usable on runners that do not support * null values. @@ -617,8 +681,7 @@ public class PAssert { private final PCollectionView actual; private CheckerDoFn( - SerializableFunction checkerFn, - PCollectionView actual) { + SerializableFunction checkerFn, PCollectionView actual) { this.checkerFn = checkerFn; this.actual = actual; } @@ -640,88 +703,11 @@ public class PAssert { } } - /** - * An assertion checker that takes a {@link PCollectionView PCollectionView<ActualT>}, - * a {@link PCollectionView PCollectionView<ExpectedT>}, a relation - * over {@code A} and {@code B}, and checks that the relation holds - * within a dataflow pipeline. - * - *

This is useful when either/both of {@code A} and {@code B} - * are not serializable, but have coders (provided - * by the underlying {@link PCollection}s). - */ - public static class TwoSideInputAssert - extends PTransform implements Serializable { - - private final transient PTransform> createActual; - private final transient PTransform> createExpected; - private final AssertRelation relation; - - protected TwoSideInputAssert( - PTransform> createActual, - PTransform> createExpected, - AssertRelation relation) { - this.createActual = createActual; - this.createExpected = createExpected; - this.relation = relation; - } - - @Override - public PDone apply(PBegin input) { - final PCollectionView actual = input.apply("CreateActual", createActual); - final PCollectionView expected = input.apply("CreateExpected", createExpected); - - input - .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply("RunChecks", ParDo.withSideInputs(actual, expected) - .of(new CheckerDoFn<>(relation, actual, expected))); - - return PDone.in(input.getPipeline()); - } - - /** - * Input is ignored, but is {@link Integer} for runners that do not support null values. - */ - private static class CheckerDoFn extends DoFn { - private final Aggregator success = - createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); - private final Aggregator failure = - createAggregator(FAILURE_COUNTER, new Sum.SumIntegerFn()); - private final AssertRelation relation; - private final PCollectionView actual; - private final PCollectionView expected; - - private CheckerDoFn(AssertRelation relation, - PCollectionView actual, PCollectionView expected) { - this.relation = relation; - this.actual = actual; - this.expected = expected; - } - - @Override - public void processElement(ProcessContext c) { - try { - ActualT actualContents = c.sideInput(actual); - ExpectedT expectedContents = c.sideInput(expected); - relation.assertFor(expectedContents).apply(actualContents); - success.addValue(1); - } catch (Throwable t) { - LOG.error("PAssert failed expectations.", t); - failure.addValue(1); - // TODO: allow for metrics to propagate on failure when running a streaming pipeline - if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { - throw t; - } - } - } - } - } - ///////////////////////////////////////////////////////////////////////////// /** - * A {@link SerializableFunction} that verifies that an actual value is equal to an - * expected value. + * A {@link SerializableFunction} that verifies that an actual value is equal to an expected + * value. */ private static class AssertIsEqualTo implements SerializableFunction { private T expected; @@ -738,8 +724,8 @@ public class PAssert { } /** - * A {@link SerializableFunction} that verifies that an actual value is not equal to an - * expected value. + * A {@link SerializableFunction} that verifies that an actual value is not equal to an expected + * value. */ private static class AssertNotEqualTo implements SerializableFunction { private T expected; @@ -756,8 +742,8 @@ public class PAssert { } /** - * A {@link SerializableFunction} that verifies that an {@code Iterable} contains - * expected items in any order. + * A {@link SerializableFunction} that verifies that an {@code Iterable} contains expected items + * in any order. */ private static class AssertContainsInAnyOrder implements SerializableFunction, Void> { @@ -787,10 +773,9 @@ public class PAssert { //////////////////////////////////////////////////////////// /** - * A binary predicate between types {@code Actual} and {@code Expected}. - * Implemented as a method {@code assertFor(Expected)} which returns - * a {@code SerializableFunction} - * that should verify the assertion.. + * A binary predicate between types {@code Actual} and {@code Expected}. Implemented as a method + * {@code assertFor(Expected)} which returns a {@code SerializableFunction} that + * should verify the assertion.. */ private static interface AssertRelation extends Serializable { public SerializableFunction assertFor(ExpectedT input); @@ -799,8 +784,7 @@ public class PAssert { /** * An {@link AssertRelation} implementing the binary predicate that two objects are equal. */ - private static class AssertIsEqualToRelation - implements AssertRelation { + private static class AssertIsEqualToRelation implements AssertRelation { @Override public SerializableFunction assertFor(T expected) { return new AssertIsEqualTo(expected); @@ -810,8 +794,7 @@ public class PAssert { /** * An {@link AssertRelation} implementing the binary predicate that two objects are not equal. */ - private static class AssertNotEqualToRelation - implements AssertRelation { + private static class AssertNotEqualToRelation implements AssertRelation { @Override public SerializableFunction assertFor(T expected) { return new AssertNotEqualTo(expected); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/810ffeb2/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index f540948..fdc8719 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -17,9 +17,6 @@ */ package org.apache.beam.sdk.testing; -import static org.apache.beam.sdk.testing.SerializableMatchers.anything; -import static org.apache.beam.sdk.testing.SerializableMatchers.not; - import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -151,30 +148,6 @@ public class PAssertTest implements Serializable { } /** - * Basic test of succeeding {@link PAssert} using a {@link SerializableMatcher}. - */ - @Test - @Category(RunnableOnService.class) - public void testBasicMatcherSuccess() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection pcollection = pipeline.apply(Create.of(42)); - PAssert.that(pcollection).containsInAnyOrder(anything()); - pipeline.run(); - } - - /** - * Basic test of failing {@link PAssert} using a {@link SerializableMatcher}. - */ - @Test - @Category(RunnableOnService.class) - public void testBasicMatcherFailure() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection pcollection = pipeline.apply(Create.of(42)); - PAssert.that(pcollection).containsInAnyOrder(not(anything())); - runExpectingAssertionFailure(pipeline); - } - - /** * Test that we throw an error at pipeline construction time when the user mistakenly uses * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}. */