Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 57C8419183 for ; Thu, 14 Apr 2016 22:23:09 +0000 (UTC) Received: (qmail 95496 invoked by uid 500); 14 Apr 2016 22:23:09 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 95448 invoked by uid 500); 14 Apr 2016 22:23:09 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 95439 invoked by uid 99); 14 Apr 2016 22:23:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 22:23:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id BA6741801DA for ; Thu, 14 Apr 2016 22:23:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 3X-e_kHaOWAv for ; Thu, 14 Apr 2016 22:23:06 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 667C25F1E7 for ; Thu, 14 Apr 2016 22:23:05 +0000 (UTC) Received: (qmail 95343 invoked by uid 99); 14 Apr 2016 22:23:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 22:23:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AAE0DDFF41; Thu, 14 Apr 2016 22:23:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bchambers@apache.org To: commits@beam.incubator.apache.org Date: Thu, 14 Apr 2016 22:23:04 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Make IdentityWindowFn and NeverTrigger available Repository: incubator-beam Updated Branches: refs/heads/master 6511ba28e -> 5bdea1e2b Make IdentityWindowFn and NeverTrigger available This will be used as part of the new PAssert IdentityWindowFn remains package-private to restrict usage. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9a1efae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9a1efae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9a1efae Branch: refs/heads/master Commit: c9a1efae25455af1e47675938e296c716ec0fa0f Parents: 6511ba2 Author: Thomas Groh Authored: Thu Mar 31 14:34:06 2016 -0700 Committer: bchambers Committed: Thu Apr 14 15:09:47 2016 -0700 ---------------------------------------------------------------------- .../transforms/windowing/AfterWatermark.java | 46 +------- .../beam/sdk/transforms/windowing/Never.java | 76 ++++++++++++ .../apache/beam/sdk/util/IdentityWindowFn.java | 116 +++++++++++++++++++ .../org/apache/beam/sdk/util/Reshuffle.java | 80 ++----------- .../sdk/transforms/windowing/NeverTest.java | 56 +++++++++ 5 files changed, 259 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 5aca093..05c6eb8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ExecutableTrigger; import org.apache.beam.sdk.util.TimeDomain; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.joda.time.Instant; @@ -96,43 +95,6 @@ public class AfterWatermark { TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger); } - /** - * A trigger which never fires. Used for the "early" trigger when only a late trigger was - * specified. - */ - private static class NeverTrigger extends OnceTrigger { - - protected NeverTrigger() { - super(null); - } - - @Override - public void onElement(OnElementContext c) throws Exception { } - - @Override - public void onMerge(OnMergeContext c) throws Exception { } - - @Override - protected Trigger getContinuationTrigger(List continuationTriggers) { - return this; - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return false; - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { - throw new UnsupportedOperationException( - String.format("%s should never fire", getClass().getSimpleName())); - } - } private static class AfterWatermarkEarlyAndLate extends Trigger @@ -314,8 +276,7 @@ public class AfterWatermark { * the given {@code Trigger} fires before the watermark has passed the end of the window. */ public AfterWatermarkEarly withEarlyFirings(OnceTrigger earlyFirings) { - Preconditions.checkNotNull(earlyFirings, - "Must specify the trigger to use for early firings"); + checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); return new AfterWatermarkEarlyAndLate(earlyFirings, null); } @@ -324,9 +285,8 @@ public class AfterWatermark { * the given {@code Trigger} fires after the watermark has passed the end of the window. */ public AfterWatermarkLate withLateFirings(OnceTrigger lateFirings) { - Preconditions.checkNotNull(lateFirings, - "Must specify the trigger to use for late firings"); - return new AfterWatermarkEarlyAndLate(new NeverTrigger(), lateFirings); + checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); + return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java new file mode 100644 index 0000000..809e841 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.joda.time.Instant; + +import java.util.List; + +/** + * A trigger which never fires. + * + *

+ * Using this trigger will only produce output when the watermark passes the end of the + * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness() allowed + * lateness}. + */ +public final class Never { + /** + * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} + * when the {@link BoundedWindow} closes. + */ + public static OnceTrigger ever() { + // NeverTrigger ignores all inputs and is Window-type independent. + return new NeverTrigger(); + } + + private static class NeverTrigger extends OnceTrigger { + protected NeverTrigger() { + super(null); + } + + @Override + public void onElement(OnElementContext c) {} + + @Override + public void onMerge(OnMergeContext c) {} + + @Override + protected Trigger getContinuationTrigger(List continuationTriggers) { + return this; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) { + return false; + } + + @Override + protected void onOnlyFiring(Trigger.TriggerContext context) { + throw new UnsupportedOperationException( + String.format("%s should never fire", getClass().getSimpleName())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java new file mode 100644 index 0000000..91e5609 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.InvalidWindows; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; + +import org.joda.time.Instant; + +import java.util.Collection; + +/** + * A {@link WindowFn} that leaves all associations between elements and windows unchanged. + * + *

This {@link WindowFn} is applied when elements must be passed through a {@link GroupByKey}, + * but should maintain their existing {@link Window} assignments. Because windows may have been + * merged, the earlier {@link WindowFn} may not appropriately maintain the existing window + * assignments. For example, if the earlier {@link WindowFn} merges windows, after a + * {@link GroupByKey} the {@link WindowingStrategy} uses {@link InvalidWindows}, and no further + * {@link GroupByKey} can be applied without applying a new {@link WindowFn}. This {@link WindowFn} + * allows existing window assignments to be maintained across a single group by key, at which point + * the earlier {@link WindowingStrategy} should be restored. + * + *

This {@link WindowFn} is an internal implementation detail of sdk-provided utilities, and + * should not be used by {@link Pipeline} writers. + */ +class IdentityWindowFn extends NonMergingWindowFn { + + /** + * The coder of the type of windows of the input {@link PCollection}. This is not an arbitrary + * {@link BoundedWindow} {@link Coder}, but is safe to use for all windows assigned by this + * transform, as it should be the same coder used by the {@link WindowFn} that initially assigned + * these windows. + */ + private final Coder coder; + private final boolean assignsToSingleWindow; + + public IdentityWindowFn(Coder coder, boolean assignsToSingleWindow) { + // Safe because it is only used privately here. + // At every point where a window is returned or accepted, it has been provided + // by priorWindowFn, so it is of the expected type. + @SuppressWarnings("unchecked") + Coder windowCoder = (Coder) coder; + this.coder = windowCoder; + this.assignsToSingleWindow = assignsToSingleWindow; + } + + @Override + public Collection assignWindows(WindowFn.AssignContext c) + throws Exception { + // The windows are provided by priorWindowFn, which also provides the coder for them + @SuppressWarnings("unchecked") + Collection priorWindows = (Collection) c.windows(); + return priorWindows; + } + + @Override + public boolean isCompatible(WindowFn other) { + throw new UnsupportedOperationException( + String.format( + "%s.isCompatible() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Override + public Coder windowCoder() { + // Safe because the previous WindowFn provides both the windows and the coder. + // The Coder is _not_ actually a coder for an arbitrary BoundedWindow. + return coder; + } + + @Override + public boolean assignsToSingleWindow() { + return assignsToSingleWindow; + } + + @Override + public BoundedWindow getSideInputWindow(BoundedWindow window) { + throw new UnsupportedOperationException( + String.format( + "%s.getSideInputWindow() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Deprecated + @Override + public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) { + return inputTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 09b2222..5c91326 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -17,22 +17,15 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.util.Collection; /** * A {@link PTransform} that returns a {@link PCollection} equivalent to its input but operationally @@ -62,11 +55,14 @@ public class Reshuffle extends PTransform>, PCollecti // If the input has already had its windows merged, then the GBK that performed the merge // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged. - Window.Bound> rewindow = Window - .>into(new PassThroughWindowFn<>(originalStrategy.getWindowFn())) - .triggering(new ReshuffleTrigger<>()) - .discardingFiredPanes() - .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + Window.Bound> rewindow = + Window.>into( + new IdentityWindowFn<>( + originalStrategy.getWindowFn().windowCoder(), + originalStrategy.getWindowFn().assignsToSingleWindow())) + .triggering(new ReshuffleTrigger<>()) + .discardingFiredPanes() + .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); return input.apply(rewindow) .apply(GroupByKey.create()) @@ -84,64 +80,4 @@ public class Reshuffle extends PTransform>, PCollecti } })); } - - /** - * A {@link WindowFn} that leaves all associations between elements and windows unchanged. - * - *

In order to implement all the abstract methods of {@link WindowFn}, this requires the - * prior {@link WindowFn}, to which all auxiliary functionality is delegated. - */ - private static class PassThroughWindowFn extends NonMergingWindowFn { - - /** The WindowFn prior to this. Used for its windowCoder, etc. */ - private final WindowFn priorWindowFn; - - public PassThroughWindowFn(WindowFn priorWindowFn) { - // Safe because it is only used privately here. - // At every point where a window is returned or accepted, it has been provided - // by priorWindowFn, so it is of the type expected. - @SuppressWarnings("unchecked") - WindowFn internalWindowFn = (WindowFn) priorWindowFn; - this.priorWindowFn = internalWindowFn; - } - - @Override - public Collection assignWindows(WindowFn.AssignContext c) - throws Exception { - // The windows are provided by priorWindowFn, which also provides the coder for them - @SuppressWarnings("unchecked") - Collection priorWindows = (Collection) c.windows(); - return priorWindows; - } - - @Override - public boolean isCompatible(WindowFn other) { - throw new UnsupportedOperationException( - String.format("%s.isCompatible() should never be called." - + " It is a private implementation detail of Reshuffle." - + " This message indicates a bug in the Dataflow SDK.", - getClass().getCanonicalName())); - } - - @Override - public Coder windowCoder() { - // Safe because priorWindowFn provides the windows also. - // The Coder is _not_ actually a coder for an arbitrary BoundedWindow. - return priorWindowFn.windowCoder(); - } - - @Override - public BoundedWindow getSideInputWindow(BoundedWindow window) { - throw new UnsupportedOperationException( - String.format("%s.getSideInputWindow() should never be called." - + " It is a private implementation detail of Reshuffle." - + " This message indicates a bug in the Dataflow SDK.", - getClass().getCanonicalName())); - } - - @Override - public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) { - return inputTimestamp; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java new file mode 100644 index 0000000..222fe4e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Never}. + */ +@RunWith(JUnit4.class) +public class NeverTest { + private SimpleTriggerTester triggerTester; + + @Before + public void setup() throws Exception { + triggerTester = + TriggerTester.forTrigger( + Never.ever(), FixedWindows.of(Duration.standardMinutes(5))); + } + + @Test + public void falseAfterEndOfWindow() throws Exception { + triggerTester.injectElements(TimestampedValue.of(1, new Instant(1))); + IntervalWindow window = + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5))); + assertThat(triggerTester.shouldFire(window), is(false)); + triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(triggerTester.shouldFire(window), is(false)); + } +}