Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C1851200B36 for ; Wed, 6 Jul 2016 19:20:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C016F160A55; Wed, 6 Jul 2016 17:20:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4D381160A79 for ; Wed, 6 Jul 2016 19:20:19 +0200 (CEST) Received: (qmail 42803 invoked by uid 500); 6 Jul 2016 17:20:18 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 42794 invoked by uid 99); 6 Jul 2016 17:20:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 17:20:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1EF9DC030A for ; Wed, 6 Jul 2016 17:20:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 5Qy1-xX06vP5 for ; Wed, 6 Jul 2016 17:20:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B844B60CD2 for ; Wed, 6 Jul 2016 17:20:08 +0000 (UTC) Received: (qmail 38001 invoked by uid 99); 6 Jul 2016 17:20:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 17:20:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 91E95E95B7; Wed, 6 Jul 2016 17:20:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Wed, 06 Jul 2016 17:20:15 -0000 Message-Id: <7505e95661834970884264b4ee9e2b00@git.apache.org> In-Reply-To: <3b9c340d491b47249a4d3d4fe2df8fbf@git.apache.org> References: <3b9c340d491b47249a4d3d4fe2df8fbf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/50] [abbrv] incubator-beam git commit: Use GatherAllPanes in PAssert archived-at: Wed, 06 Jul 2016 17:20:20 -0000 Use GatherAllPanes in PAssert Instead of explicitly grouping by key, gather all the panes across the input window. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8b0cbf9a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8b0cbf9a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8b0cbf9a Branch: refs/heads/runners-spark2 Commit: 8b0cbf9a0d32837260449a0b4ab38eb9712b5fd3 Parents: c8babc1 Author: Thomas Groh Authored: Mon Jun 20 14:38:11 2016 -0700 Committer: Luke Cwik Committed: Wed Jul 6 10:18:49 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/testing/PAssert.java | 369 +++++++++++++++---- .../apache/beam/sdk/testing/PAssertTest.java | 116 ++++++ 2 files changed, 418 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b0cbf9a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 1a3d85d..883b2b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.testing; import static com.google.common.base.Preconditions.checkState; - import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -36,18 +35,24 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.GatherAllPanes; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -63,9 +68,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -116,6 +123,14 @@ public class PAssert { * Builder interface for assertions applicable to iterables and PCollection contents. */ public interface IterableAssert { + /** + * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only + * run on the provided window. + * + * @return a new {@link IterableAssert} like this one but with the assertion only applied to the + * specified window. + */ + IterableAssert inWindow(BoundedWindow window); /** * Asserts that the iterable in question contains the provided elements. @@ -152,6 +167,15 @@ public class PAssert { */ public interface SingletonAssert { /** + * Creates a new {@link SingletonAssert} like this one, but with the assertion restricted to + * only run on the provided window. + * + * @return a new {@link SingletonAssert} like this one but with the assertion only applied to + * the specified window. + */ + SingletonAssert inWindow(BoundedWindow window); + + /** * Asserts that the value in question is equal to the provided value, according to * {@link Object#equals}. * @@ -250,9 +274,23 @@ public class PAssert { */ private static class PCollectionContentsAssert implements IterableAssert { private final PCollection actual; + private final AssertionWindows rewindowingStrategy; public PCollectionContentsAssert(PCollection actual) { + this(actual, IntoGlobalWindow.of()); + } + + public PCollectionContentsAssert(PCollection actual, AssertionWindows rewindowingStrategy) { this.actual = actual; + this.rewindowingStrategy = rewindowingStrategy; + } + + @Override + public PCollectionContentsAssert inWindow(BoundedWindow window) { + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = + (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); + return new PCollectionContentsAssert<>(actual, IntoStaticWindows.of(windowCoder, window)); } /** @@ -285,7 +323,7 @@ public class PAssert { @Override public PCollectionContentsAssert satisfies( SerializableFunction, Void> checkerFn) { - actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn)); + actual.apply(nextAssertionName(), new GroupThenAssert<>(checkerFn, rewindowingStrategy)); return this; } @@ -325,7 +363,8 @@ public class PAssert { @SuppressWarnings({"rawtypes", "unchecked"}) SerializableFunction, Void> checkerFn = (SerializableFunction) new MatcherCheckerFn<>(matcher); - actual.apply("PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn)); + actual.apply( + "PAssert$" + (assertCount++), new GroupThenAssert<>(checkerFn, rewindowingStrategy)); return this; } @@ -374,13 +413,30 @@ public class PAssert { private static class PCollectionSingletonIterableAssert implements IterableAssert { private final PCollection> actual; private final Coder elementCoder; + private final AssertionWindows rewindowingStrategy; public PCollectionSingletonIterableAssert(PCollection> actual) { + this(actual, IntoGlobalWindow.>of()); + } + + public PCollectionSingletonIterableAssert( + PCollection> actual, AssertionWindows rewindowingStrategy) { this.actual = actual; @SuppressWarnings("unchecked") Coder typedCoder = (Coder) actual.getCoder().getCoderArguments().get(0); this.elementCoder = typedCoder; + + this.rewindowingStrategy = rewindowingStrategy; + } + + @Override + public PCollectionSingletonIterableAssert inWindow(BoundedWindow window) { + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = + (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); + return new PCollectionSingletonIterableAssert<>( + actual, IntoStaticWindows.>of(windowCoder, window)); } @Override @@ -402,7 +458,9 @@ public class PAssert { @Override public PCollectionSingletonIterableAssert satisfies( SerializableFunction, Void> checkerFn) { - actual.apply("PAssert$" + (assertCount++), new GroupThenAssertForSingleton<>(checkerFn)); + actual.apply( + "PAssert$" + (assertCount++), + new GroupThenAssertForSingleton<>(checkerFn, rewindowingStrategy)); return this; } @@ -421,18 +479,38 @@ public class PAssert { private static class PCollectionViewAssert implements SingletonAssert { private final PCollection actual; private final PTransform, PCollectionView> view; + private final AssertionWindows rewindowActuals; private final Coder coder; protected PCollectionViewAssert( PCollection actual, PTransform, PCollectionView> view, Coder coder) { + this(actual, view, IntoGlobalWindow.of(), coder); + } + + private PCollectionViewAssert( + PCollection actual, + PTransform, PCollectionView> view, + AssertionWindows rewindowActuals, + Coder coder) { this.actual = actual; this.view = view; + this.rewindowActuals = rewindowActuals; this.coder = coder; } @Override + public PCollectionViewAssert inWindow(BoundedWindow window) { + return new PCollectionViewAssert<>( + actual, + view, + IntoStaticWindows.of( + (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(), window), + coder); + } + + @Override public PCollectionViewAssert isEqualTo(ViewT expectedValue) { return satisfies(new AssertIsEqualToRelation(), expectedValue); } @@ -449,7 +527,10 @@ public class PAssert { .getPipeline() .apply( "PAssert$" + (assertCount++), - new OneSideInputAssert(CreateActual.from(actual, view), checkerFn)); + new OneSideInputAssert( + CreateActual.from(actual, rewindowActuals, view), + rewindowActuals.windowDummy(), + checkerFn)); return this; } @@ -496,16 +577,22 @@ public class PAssert { extends PTransform> { private final transient PCollection actual; + private final transient AssertionWindows rewindowActuals; private final transient PTransform, PCollectionView> actualView; public static CreateActual from( - PCollection actual, PTransform, PCollectionView> actualView) { - return new CreateActual<>(actual, actualView); + PCollection actual, + AssertionWindows rewindowActuals, + PTransform, PCollectionView> actualView) { + return new CreateActual<>(actual, rewindowActuals, actualView); } private CreateActual( - PCollection actual, PTransform, PCollectionView> actualView) { + PCollection actual, + AssertionWindows rewindowActuals, + PTransform, PCollectionView> actualView) { this.actual = actual; + this.rewindowActuals = rewindowActuals; this.actualView = actualView; } @@ -513,7 +600,8 @@ public class PAssert { public PCollectionView apply(PBegin input) { final Coder coder = actual.getCoder(); return actual - .apply(Window.into(new GlobalWindows())) + .apply("FilterActuals", rewindowActuals.prepareActuals()) + .apply("RewindowActuals", rewindowActuals.windowActuals()) .apply( ParDo.of( new DoFn() { @@ -565,84 +653,83 @@ public class PAssert { *

If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing * a single empty iterable, even though in practice most runners will not produce any element. */ - private static class GroupGlobally extends PTransform, PCollection>> + private static class GroupGlobally + extends PTransform, PCollection>>> implements Serializable { + private final AssertionWindows rewindowingStrategy; - public GroupGlobally() {} + public GroupGlobally(AssertionWindows rewindowingStrategy) { + this.rewindowingStrategy = rewindowingStrategy; + } @Override - public PCollection> apply(PCollection input) { - - final int contentsKey = 0; - final int dummyKey = 1; + public PCollection>> apply(PCollection input) { final int combinedKey = 42; + // Remove the triggering on both + PTransform< + PCollection>>>, + PCollection>>>> + removeTriggering = + Window.>>>triggering(Never.ever()) + .discardingFiredPanes() + .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()); // Group the contents by key. If it is empty, this PCollection will be empty, too. // Then key it again with a dummy key. - PCollection>>> doubleKeyedGroupedInput = + PCollection>>> groupedContents = + // TODO: Split the filtering from the rewindowing, and apply filtering before the Gather + // if the grouping of extra records input - .apply("GloballyWindow", Window.into(new GlobalWindows())) - .apply("ContentsWithKeys", WithKeys.of(contentsKey)) + .apply(rewindowingStrategy.prepareActuals()) + .apply("GatherAllOutputs", GatherAllPanes.globally()) .apply( - "NeverTriggerContents", - Window.>triggering(Never.ever()).discardingFiredPanes()) - .apply("ContentsGBK", GroupByKey.create()) - .apply( - "DoubleKeyContents", WithKeys.>>of(combinedKey)); + "RewindowActuals", + rewindowingStrategy.>>windowActuals()) + .apply("KeyForDummy", WithKeys.>>of(combinedKey)) + .apply("RemoveActualsTriggering", removeTriggering); // Create another non-empty PCollection that is keyed with a distinct dummy key - PCollection>>> doubleKeyedDummy = + PCollection>>> keyedDummy = input .getPipeline() .apply( Create.of( KV.of( combinedKey, - KV.of(dummyKey, (Iterable) Collections.emptyList()))) - .withCoder(doubleKeyedGroupedInput.getCoder())) - .setWindowingStrategyInternal(doubleKeyedGroupedInput.getWindowingStrategy()); + (Iterable>) + Collections.>emptyList())) + .withCoder(groupedContents.getCoder())) + .apply( + "WindowIntoDummy", + rewindowingStrategy.>>>windowDummy()) + .apply("RemoveDummyTriggering", removeTriggering); // Flatten them together and group by the combined key to get a single element - PCollection>>>> dummyAndContents = - PCollectionList.>>>of(doubleKeyedGroupedInput) - .and(doubleKeyedDummy) + PCollection>>>> dummyAndContents = + PCollectionList.of(groupedContents) + .and(keyedDummy) .apply( "FlattenDummyAndContents", - Flatten.>>>pCollections()) + Flatten.>>>pCollections()) .apply( - "GroupDummyAndContents", GroupByKey.>>create()); + "NeverTrigger", + Window.>>>triggering(Never.ever()) + .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()) + .discardingFiredPanes()) + .apply( + "GroupDummyAndContents", + GroupByKey.>>create()); - // Extract the contents if they exist else empty contents. return dummyAndContents - .apply( - "GetContents", - ParDo.of( - new DoFn>>>, Iterable>() { - @Override - public void processElement(ProcessContext ctx) { - Iterable>> groupedDummyAndContents = - ctx.element().getValue(); - - if (Iterables.size(groupedDummyAndContents) == 1) { - // Only the dummy value, so just output empty - ctx.output(Collections.emptyList()); - } else { - checkState( - Iterables.size(groupedDummyAndContents) == 2, - "Internal error: PAssert grouped contents with a" - + " dummy value resulted in more than 2 groupings: %s", - groupedDummyAndContents); - - if (Iterables.get(groupedDummyAndContents, 0).getKey() == contentsKey) { - // The first iterable in the group holds the real contents - ctx.output(Iterables.get(groupedDummyAndContents, 0).getValue()); - } else { - // The second iterable holds the real contents - ctx.output(Iterables.get(groupedDummyAndContents, 1).getValue()); - } - } - } - })); + .apply(Values.>>>create()) + .apply(ParDo.of(new ConcatFn>())); + } + } + + private static final class ConcatFn extends DoFn>, Iterable> { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(Iterables.concat(c.element())); } } @@ -653,15 +740,20 @@ public class PAssert { public static class GroupThenAssert extends PTransform, PDone> implements Serializable { private final SerializableFunction, Void> checkerFn; + private final AssertionWindows rewindowingStrategy; - private GroupThenAssert(SerializableFunction, Void> checkerFn) { + private GroupThenAssert( + SerializableFunction, Void> checkerFn, AssertionWindows rewindowingStrategy) { this.checkerFn = checkerFn; + this.rewindowingStrategy = rewindowingStrategy; } @Override public PDone apply(PCollection input) { input - .apply("GroupGlobally", new GroupGlobally()) + .apply("GroupGlobally", new GroupGlobally(rewindowingStrategy)) + .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane())) + .setCoder(IterableCoder.of(input.getCoder())) .apply("RunChecks", ParDo.of(new GroupedValuesCheckerDoFn<>(checkerFn))); return PDone.in(input.getPipeline()); @@ -675,15 +767,20 @@ public class PAssert { public static class GroupThenAssertForSingleton extends PTransform>, PDone> implements Serializable { private final SerializableFunction, Void> checkerFn; + private final AssertionWindows rewindowingStrategy; - private GroupThenAssertForSingleton(SerializableFunction, Void> checkerFn) { + private GroupThenAssertForSingleton( + SerializableFunction, Void> checkerFn, AssertionWindows rewindowingStrategy) { this.checkerFn = checkerFn; + this.rewindowingStrategy = rewindowingStrategy; } @Override public PDone apply(PCollection> input) { input - .apply("GroupGlobally", new GroupGlobally>()) + .apply("GroupGlobally", new GroupGlobally>(rewindowingStrategy)) + .apply("GetOnlyPane", ParDo.of(new ExtractOnlyPane>())) + .setCoder(IterableCoder.of(input.getCoder())) .apply("RunChecks", ParDo.of(new SingletonCheckerDoFn<>(checkerFn))); return PDone.in(input.getPipeline()); @@ -703,12 +800,15 @@ public class PAssert { public static class OneSideInputAssert extends PTransform implements Serializable { private final transient PTransform> createActual; + private final transient PTransform, PCollection> windowToken; private final SerializableFunction checkerFn; private OneSideInputAssert( PTransform> createActual, + PTransform, PCollection> windowToken, SerializableFunction checkerFn) { this.createActual = createActual; + this.windowToken = windowToken; this.checkerFn = checkerFn; } @@ -718,7 +818,9 @@ public class PAssert { input .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply("RunChecks", + .apply("WindowToken", windowToken) + .apply( + "RunChecks", ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual))); return PDone.in(input.getPipeline()); @@ -760,6 +862,23 @@ public class PAssert { } } + private static class ExtractOnlyPane extends DoFn>, Iterable> { + @Override + public void processElement(ProcessContext c) throws Exception { + List outputs = new ArrayList<>(); + for (WindowedValue value : c.element()) { + checkState( + value.getPane().isFirst() && value.getPane().isLast(), + "Expected elements to be produced by a trigger that fires at most once, but got" + + "a value in a pane that is %s. Actual Pane Info: %s", + value.getPane().isFirst() ? "not the last pane" : "not the first pane", + value.getPane()); + outputs.add(value.getValue()); + } + c.output(outputs); + } + } + /** * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of * the single iterable element of the input {@link PCollection} and adjusts counters and @@ -948,4 +1067,120 @@ public class PAssert { return new AssertContainsInAnyOrder(expectedElements); } } + + //////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * A strategy for filtering and rewindowing the actual and dummy {@link PCollection PCollections} + * within a {@link PAssert}. + * + *

This must ensure that the windowing strategies of the output of {@link #windowActuals()} and + * {@link #windowDummy()} are compatible (and can be {@link Flatten Flattened}). + * + *

The {@link PCollection} produced by {@link #prepareActuals()} will be a parent (though not + * a direct parent) of the transform provided to {@link #windowActuals()}. + */ + private interface AssertionWindows { + /** + * Returns a transform that assigns the dummy element into the appropriate + * {@link BoundedWindow windows}. + */ + PTransform, PCollection> windowDummy(); + + /** + * Returns a transform that filters and reassigns windows of the actual elements if necessary. + */ + PTransform, PCollection> prepareActuals(); + + /** + * Returns a transform that assigns the actual elements into the appropriate + * {@link BoundedWindow windows}. Will be called after {@link #prepareActuals()}. + */ + PTransform, PCollection> windowActuals(); + } + + /** + * An {@link AssertionWindows} which assigns all elements to the {@link GlobalWindow}. + */ + private static class IntoGlobalWindow implements AssertionWindows, Serializable { + public static AssertionWindows of() { + return new IntoGlobalWindow(); + } + + private PTransform, PCollection> window() { + return Window.into(new GlobalWindows()); + } + + @Override + public PTransform, PCollection> windowDummy() { + return window(); + } + + /** + * Rewindows all input elements into the {@link GlobalWindow}. This ensures that the result + * PCollection will contain all of the elements of the PCollection when the window is not + * specified. + */ + @Override + public PTransform, PCollection> prepareActuals() { + return window(); + } + + @Override + public PTransform, PCollection> windowActuals() { + return window(); + } + } + + private static class IntoStaticWindows implements AssertionWindows { + private final StaticWindows windowFn; + + public static AssertionWindows of(Coder windowCoder, BoundedWindow window) { + return new IntoStaticWindows(StaticWindows.of(windowCoder, window)); + } + + private IntoStaticWindows(StaticWindows windowFn) { + this.windowFn = windowFn; + } + + @Override + public PTransform, PCollection> windowDummy() { + return Window.into(windowFn); + } + + @Override + public PTransform, PCollection> prepareActuals() { + return new FilterWindows<>(windowFn); + } + + @Override + public PTransform, PCollection> windowActuals() { + return Window.into(windowFn.intoOnlyExisting()); + } + } + + /** + * A DoFn that filters elements based on their presence in a static collection of windows. + */ + private static final class FilterWindows extends PTransform, PCollection> { + private final StaticWindows windows; + + public FilterWindows(StaticWindows windows) { + this.windows = windows; + } + + @Override + public PCollection apply(PCollection input) { + return input.apply("FilterWindows", ParDo.of(new Fn())); + } + + private class Fn extends DoFn implements RequiresWindowAccess { + @Override + public void processElement(ProcessContext c) throws Exception { + if (windows.getWindows().contains(c.window())) { + c.output(c.element()); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b0cbf9a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index fdc8719..bafd897 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.testing; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -25,11 +27,21 @@ import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; + +import com.google.common.collect.Iterables; import com.fasterxml.jackson.annotation.JsonCreator; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -148,6 +160,45 @@ public class PAssertTest implements Serializable { } /** + * A {@link PAssert} about the contents of a {@link PCollection} + * is allows to be verified by an arbitrary {@link SerializableFunction}, + * though. + */ + @Test + @Category(RunnableOnService.class) + public void testWindowedSerializablePredicate() throws Exception { + Pipeline pipeline = TestPipeline.create(); + + PCollection pcollection = pipeline + .apply(Create.timestamped( + TimestampedValue.of(new NotSerializableObject(), new Instant(250L)), + TimestampedValue.of(new NotSerializableObject(), new Instant(500L))) + .withCoder(NotSerializableObjectCoder.of())) + .apply(Window.into(FixedWindows.of(Duration.millis(300L)))); + + PAssert.that(pcollection) + .inWindow(new IntervalWindow(new Instant(0L), new Instant(300L))) + .satisfies(new SerializableFunction, Void>() { + @Override + public Void apply(Iterable contents) { + assertThat(Iterables.isEmpty(contents), is(false)); + return null; // no problem! + } + }); + PAssert.that(pcollection) + .inWindow(new IntervalWindow(new Instant(300L), new Instant(600L))) + .satisfies(new SerializableFunction, Void>() { + @Override + public Void apply(Iterable contents) { + assertThat(Iterables.isEmpty(contents), is(false)); + return null; // no problem! + } + }); + + pipeline.run(); + } + + /** * Test that we throw an error at pipeline construction time when the user mistakenly uses * {@code PAssert.thatSingleton().equals()} instead of the test method {@code .isEqualTo}. */ @@ -220,6 +271,26 @@ public class PAssertTest implements Serializable { } /** + * Basic test for {@code isEqualTo}. + */ + @Test + @Category(RunnableOnService.class) + public void testWindowedIsEqualTo() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection pcollection = + pipeline.apply(Create.timestamped(TimestampedValue.of(43, new Instant(250L)), + TimestampedValue.of(22, new Instant(-250L)))) + .apply(Window.into(FixedWindows.of(Duration.millis(500L)))); + PAssert.thatSingleton(pcollection) + .inWindow(new IntervalWindow(new Instant(0L), new Instant(500L))) + .isEqualTo(43); + PAssert.thatSingleton(pcollection) + .inWindow(new IntervalWindow(new Instant(-500L), new Instant(0L))) + .isEqualTo(22); + pipeline.run(); + } + + /** * Basic test for {@code notEqualTo}. */ @Test @@ -244,6 +315,51 @@ public class PAssertTest implements Serializable { } /** + * Tests that {@code containsInAnyOrder} is actually order-independent. + */ + @Test + @Category(RunnableOnService.class) + public void testGlobalWindowContainsInAnyOrder() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); + PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 1, 4, 3); + pipeline.run(); + } + + /** + * Tests that windowed {@code containsInAnyOrder} is actually order-independent. + */ + @Test + @Category(RunnableOnService.class) + public void testWindowedContainsInAnyOrder() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection pcollection = + pipeline.apply(Create.timestamped(TimestampedValue.of(1, new Instant(100L)), + TimestampedValue.of(2, new Instant(200L)), + TimestampedValue.of(3, new Instant(300L)), + TimestampedValue.of(4, new Instant(400L)))) + .apply(Window.into(SlidingWindows.of(Duration.millis(200L)) + .every(Duration.millis(100L)) + .withOffset(Duration.millis(50L)))); + + PAssert.that(pcollection) + .inWindow(new IntervalWindow(new Instant(-50L), new Instant(150L))).containsInAnyOrder(1); + PAssert.that(pcollection) + .inWindow(new IntervalWindow(new Instant(50L), new Instant(250L))) + .containsInAnyOrder(2, 1); + PAssert.that(pcollection) + .inWindow(new IntervalWindow(new Instant(150L), new Instant(350L))) + .containsInAnyOrder(2, 3); + PAssert.that(pcollection) + .inWindow(new IntervalWindow(new Instant(250L), new Instant(450L))) + .containsInAnyOrder(4, 3); + PAssert.that(pcollection) + .inWindow(new IntervalWindow(new Instant(350L), new Instant(550L))) + .containsInAnyOrder(4); + pipeline.run(); + } + + /** * Tests that {@code containsInAnyOrder} fails when and how it should. */ @Test