Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8BD60200BAE for ; Fri, 14 Oct 2016 00:23:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 88E11160AF6; Thu, 13 Oct 2016 22:23:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3543D160AE4 for ; Fri, 14 Oct 2016 00:23:03 +0200 (CEST) Received: (qmail 96585 invoked by uid 500); 13 Oct 2016 22:23:01 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 96501 invoked by uid 99); 13 Oct 2016 22:23:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Oct 2016 22:23:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id AD4091A00B0 for ; Thu, 13 Oct 2016 22:23:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id q3mUvpxCTorX for ; Thu, 13 Oct 2016 22:22:48 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id F259160D32 for ; Thu, 13 Oct 2016 22:22:37 +0000 (UTC) Received: (qmail 93233 invoked by uid 99); 13 Oct 2016 22:22:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Oct 2016 22:22:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3802DE0D5C; Thu, 13 Oct 2016 22:22:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Thu, 13 Oct 2016 22:22:50 -0000 Message-Id: <7b90729278d641179c94364498c4cf65@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/17] incubator-beam git commit: Restore prior trigger files, for temporary compatibility archived-at: Thu, 13 Oct 2016 22:23:05 -0000 Restore prior trigger files, for temporary compatibility Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a64acb2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a64acb2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a64acb2f Branch: refs/heads/master Commit: a64acb2f84ac26bd1a3f297085477f13b0252570 Parents: 69b1efd Author: Kenneth Knowles Authored: Tue Oct 11 21:35:02 2016 -0700 Committer: Kenneth Knowles Committed: Thu Oct 13 14:34:35 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/core/TriggerRunner.java | 247 +++++++++ .../beam/sdk/transforms/windowing/AfterAll.java | 122 +++++ .../windowing/AfterDelayFromFirstElement.java | 335 ++++++++++++ .../sdk/transforms/windowing/AfterEach.java | 141 +++++ .../sdk/transforms/windowing/AfterFirst.java | 124 +++++ .../sdk/transforms/windowing/AfterPane.java | 144 +++++ .../windowing/AfterProcessingTime.java | 102 ++++ .../AfterSynchronizedProcessingTime.java | 73 +++ .../transforms/windowing/AfterWatermark.java | 355 +++++++++++++ .../transforms/windowing/DefaultTrigger.java | 92 ++++ .../beam/sdk/transforms/windowing/Never.java | 75 +++ .../transforms/windowing/OrFinallyTrigger.java | 105 ++++ .../sdk/transforms/windowing/Repeatedly.java | 101 ++++ .../beam/sdk/transforms/windowing/Trigger.java | 527 +++++++++++++++++++ .../apache/beam/sdk/util/ExecutableTrigger.java | 159 ++++++ .../apache/beam/sdk/util/FinishedTriggers.java | 44 ++ .../beam/sdk/util/FinishedTriggersBitSet.java | 67 +++ .../beam/sdk/util/FinishedTriggersSet.java | 72 +++ .../apache/beam/sdk/util/ReshuffleTrigger.java | 66 +++ .../beam/sdk/util/TriggerContextFactory.java | 507 ++++++++++++++++++ .../sdk/transforms/windowing/AfterAllTest.java | 156 ++++++ .../sdk/transforms/windowing/AfterEachTest.java | 132 +++++ .../transforms/windowing/AfterFirstTest.java | 181 +++++++ .../sdk/transforms/windowing/AfterPaneTest.java | 132 +++++ .../windowing/AfterProcessingTimeTest.java | 187 +++++++ .../AfterSynchronizedProcessingTimeTest.java | 121 +++++ .../windowing/AfterWatermarkTest.java | 380 +++++++++++++ .../windowing/DefaultTriggerTest.java | 176 +++++++ .../sdk/transforms/windowing/NeverTest.java | 56 ++ .../windowing/OrFinallyTriggerTest.java | 215 ++++++++ .../transforms/windowing/RepeatedlyTest.java | 224 ++++++++ .../sdk/transforms/windowing/StubTrigger.java | 70 +++ .../sdk/transforms/windowing/TriggerTest.java | 118 +++++ .../org/apache/beam/sdk/util/TriggerTester.java | 410 +++++++++++++++ 34 files changed, 6016 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java new file mode 100644 index 0000000..8d0f322 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.util.BitSetCoder; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.apache.beam.sdk.util.FinishedTriggers; +import org.apache.beam.sdk.util.FinishedTriggersBitSet; +import org.apache.beam.sdk.util.Timers; +import org.apache.beam.sdk.util.TriggerContextFactory; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.joda.time.Instant; + +/** + * Executes a trigger while managing persistence of information about which subtriggers are + * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger. + * + *

Specifically, the responsibilities are: + * + *

    + *
  • Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by + * constructing the appropriate trigger contexts.
  • + *
  • Committing a record of which subtriggers are finished to persistent state.
  • + *
  • Restoring the record of which subtriggers are finished from persistent state.
  • + *
  • Clearing out the persisted finished set when a caller indicates + * (via {#link #clearFinished}) that it is no longer needed.
  • + *
+ * + *

These responsibilities are intertwined: trigger contexts include mutable information about + * which subtriggers are finished. This class provides the information when building the contexts + * and commits the information when the method of the {@link ExecutableTrigger} returns. + * + * @param The kind of windows being processed. + */ +public class TriggerRunner { + @VisibleForTesting + static final StateTag> FINISHED_BITS_TAG = + StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); + + private final ExecutableTrigger rootTrigger; + private final TriggerContextFactory contextFactory; + + public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory contextFactory) { + checkState(rootTrigger.getTriggerIndex() == 0); + this.rootTrigger = rootTrigger; + this.contextFactory = contextFactory; + } + + private FinishedTriggersBitSet readFinishedBits(ValueState state) { + if (!isFinishedSetNeeded()) { + // If no trigger in the tree will ever have finished bits, then we don't need to read them. + // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not + // finished) for each trigger in the tree. + return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); + } + + BitSet bitSet = state.read(); + return bitSet == null + ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) + : FinishedTriggersBitSet.fromBitSet(bitSet); + } + + + private void clearFinishedBits(ValueState state) { + if (!isFinishedSetNeeded()) { + // Nothing to clear. + return; + } + state.clear(); + } + + /** Return true if the trigger is closed in the window corresponding to the specified state. */ + public boolean isClosed(StateAccessor state) { + return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchForValue(W window, StateAccessor state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnElement( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchOnFire(W window, StateAccessor state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchShouldFire(W window, StateAccessor state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchShouldFire( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + /** + * Run the trigger logic to deal with a new value. + */ + public void processValue(W window, Instant timestamp, Timers timers, StateAccessor state) + throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext( + window, timers, timestamp, rootTrigger, finishedSet); + rootTrigger.invokeOnElement(triggerContext); + persistFinishedSet(state, finishedSet); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchForMerge( + W window, Collection mergingWindows, MergingStateAccessor state) { + if (isFinishedSetNeeded()) { + for (ValueState value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) { + value.readLater(); + } + } + rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor( + window, mergingWindows, rootTrigger)); + } + + /** + * Run the trigger merging logic as part of executing the specified merge. + */ + public void onMerge(W window, Timers timers, MergingStateAccessor state) throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + + // And read the finished bits in each merging window. + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Map.Entry> entry : + state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) { + // Don't need to clone these, since the trigger context doesn't allow modification + builder.put(entry.getKey(), readFinishedBits(entry.getValue())); + // Clear the underlying finished bits. + clearFinishedBits(entry.getValue()); + } + ImmutableMap mergingFinishedSets = builder.build(); + + Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext( + window, timers, rootTrigger, finishedSet, mergingFinishedSets); + + // Run the merge from the trigger + rootTrigger.invokeOnMerge(mergeContext); + + persistFinishedSet(state, finishedSet); + } + + public boolean shouldFire(W window, Timers timers, StateAccessor state) throws Exception { + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + return rootTrigger.invokeShouldFire(context); + } + + public void onFire(W window, Timers timers, StateAccessor state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + rootTrigger.invokeOnFire(context); + persistFinishedSet(state, finishedSet); + } + + private void persistFinishedSet( + StateAccessor state, FinishedTriggersBitSet modifiedFinishedSet) { + if (!isFinishedSetNeeded()) { + return; + } + + ValueState finishedSetState = state.access(FINISHED_BITS_TAG); + if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { + if (modifiedFinishedSet.getBitSet().isEmpty()) { + finishedSetState.clear(); + } else { + finishedSetState.write(modifiedFinishedSet.getBitSet()); + } + } + } + + /** + * Clear the finished bits. + */ + public void clearFinished(StateAccessor state) { + clearFinishedBits(state.access(FINISHED_BITS_TAG)); + } + + /** + * Clear the state used for executing triggers, but leave the finished set to indicate + * the window is closed. + */ + public void clearState(W window, Timers timers, StateAccessor state) throws Exception { + // Don't need to clone, because we'll be clearing the finished bits anyways. + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)); + rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet)); + } + + private boolean isFinishedSetNeeded() { + // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the + // lookup. Right now, we special case this for the DefaultTrigger. + return !(rootTrigger.getSpec() instanceof DefaultTrigger); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java new file mode 100644 index 0000000..cc8c97f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.joda.time.Instant; + +/** + * Create a {@link Trigger} that fires and finishes once after all of its sub-triggers have fired. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterAll extends OnceTrigger { + + private AfterAll(List subTriggers) { + super(subTriggers); + checkArgument(subTriggers.size() > 1); + } + + /** + * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. + */ + public static OnceTrigger of(OnceTrigger... triggers) { + return new AfterAll(Arrays.asList(triggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) { + // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH. + // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH. + subTrigger.invokeOnElement(c); + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnMerge(c); + } + boolean allFinished = true; + for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) { + allFinished &= c.forTrigger(subTrigger1).trigger().isFinished(); + } + c.trigger().setFinished(allFinished); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // This trigger will fire after the latest of its sub-triggers. + Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE; + for (Trigger subTrigger : subTriggers) { + Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window); + if (deadline.isBefore(subDeadline)) { + deadline = subDeadline; + } + } + return deadline; + } + + @Override + public OnceTrigger getContinuationTrigger(List continuationTriggers) { + return new AfterAll(continuationTriggers); + } + + /** + * {@inheritDoc} + * + * @return {@code true} if all subtriggers return {@code true}. + */ + @Override + public boolean shouldFire(TriggerContext context) throws Exception { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + if (!context.forTrigger(subtrigger).trigger().isFinished() + && !subtrigger.invokeShouldFire(context)) { + return false; + } + } + return true; + } + + /** + * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire} + * because they all must be ready to fire. + */ + @Override + public void onOnlyFiring(TriggerContext context) throws Exception { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + subtrigger.invokeOnFire(context); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterAll.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java new file mode 100644 index 0000000..c4bc946 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.collect.ImmutableList; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; +import org.joda.time.format.PeriodFormatter; + +/** + * A base class for triggers that happen after a processing time delay from the arrival + * of the first element in a pane. + * + *

This class is for internal use only and may change at any time. + */ +@Experimental(Experimental.Kind.TRIGGER) +public abstract class AfterDelayFromFirstElement extends OnceTrigger { + + protected static final List> IDENTITY = + ImmutableList.>of(); + + protected static final StateTag, Instant>> DELAYED_UNTIL_TAG = + StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( + "delayed", InstantCoder.of(), Min.MinFn.naturalOrder())); + + private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); + + /** + * To complete an implementation, return the desired time from the TriggerContext. + */ + @Nullable + public abstract Instant getCurrentTime(Trigger.TriggerContext context); + + /** + * To complete an implementation, return a new instance like this one, but incorporating + * the provided timestamp mapping functions. Generally should be used by calling the + * constructor of this class from the constructor of the subclass. + */ + protected abstract AfterDelayFromFirstElement newWith( + List> transform); + + /** + * A list of timestampMappers m1, m2, m3, ... m_n considered to be composed in sequence. The + * overall mapping for an instance `instance` is `m_n(... m3(m2(m1(instant))`, + * implemented via #computeTargetTimestamp + */ + protected final List> timestampMappers; + + private final TimeDomain timeDomain; + + public AfterDelayFromFirstElement( + TimeDomain timeDomain, + List> timestampMappers) { + super(null); + this.timestampMappers = timestampMappers; + this.timeDomain = timeDomain; + } + + private Instant getTargetTimestamp(OnElementContext c) { + return computeTargetTimestamp(c.currentProcessingTime()); + } + + /** + * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater + * than the timestamp. + * + *

TODO: Consider sharing this with FixedWindows, and bring over the equivalent of + * CalendarWindows. + */ + public AfterDelayFromFirstElement alignedTo(final Duration size, final Instant offset) { + return newWith(new AlignFn(size, offset)); + } + + /** + * Aligns the time to be the smallest multiple of {@code size} greater than the timestamp + * since the epoch. + */ + public AfterDelayFromFirstElement alignedTo(final Duration size) { + return alignedTo(size, new Instant(0)); + } + + /** + * Adds some delay to the original target time. + * + * @param delay the delay to add + * @return An updated time trigger that will wait the additional time before firing. + */ + public AfterDelayFromFirstElement plusDelayOf(final Duration delay) { + return newWith(new DelayFn(delay)); + } + + /** + * @deprecated This will be removed in the next major version. Please use only + * {@link #plusDelayOf} and {@link #alignedTo}. + */ + @Deprecated + public OnceTrigger mappedTo(SerializableFunction timestampMapper) { + return newWith(timestampMapper); + } + + @Override + public boolean isCompatible(Trigger other) { + if (!getClass().equals(other.getClass())) { + return false; + } + + AfterDelayFromFirstElement that = (AfterDelayFromFirstElement) other; + return this.timestampMappers.equals(that.timestampMappers); + } + + + private AfterDelayFromFirstElement newWith( + SerializableFunction timestampMapper) { + return newWith( + ImmutableList.>builder() + .addAll(timestampMappers) + .add(timestampMapper) + .build()); + } + + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = + "prefetch side effect") + public void prefetchOnElement(StateAccessor state) { + state.access(DELAYED_UNTIL_TAG).readLater(); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + CombiningState delayUntilState = c.state().access(DELAYED_UNTIL_TAG); + Instant oldDelayUntil = delayUntilState.read(); + + // Since processing time can only advance, resulting in target wake-up times we would + // ignore anyhow, we don't bother with it if it is already set. + if (oldDelayUntil != null) { + return; + } + + Instant targetTimestamp = getTargetTimestamp(c); + delayUntilState.add(targetTimestamp); + c.setTimer(targetTimestamp, timeDomain); + } + + @Override + public void prefetchOnMerge(MergingStateAccessor state) { + super.prefetchOnMerge(state); + StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // NOTE: We could try to delete all timers which are still active, but we would + // need access to a timer context for each merging window. + // for (CombiningValueStateInternal, Instant> state : + // c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) { + // Instant timestamp = state.get().read(); + // if (timestamp != null) { + // .deleteTimer(timestamp, timeDomain); + // } + // } + // Instead let them fire and be ignored. + + // If the trigger is already finished, there is no way it will become re-activated + if (c.trigger().isFinished()) { + StateMerging.clear(c.state(), DELAYED_UNTIL_TAG); + // NOTE: We do not attempt to delete the timers. + return; + } + + // Determine the earliest point across all the windows, and delay to that. + StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG); + + Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read(); + if (earliestTargetTime != null) { + c.setTimer(earliestTargetTime, timeDomain); + } + } + + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = + "prefetch side effect") + public void prefetchShouldFire(StateAccessor state) { + state.access(DELAYED_UNTIL_TAG).readLater(); + } + + @Override + public void clear(TriggerContext c) throws Exception { + c.state().access(DELAYED_UNTIL_TAG).clear(); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read(); + return delayedUntil != null + && getCurrentTime(context) != null + && getCurrentTime(context).isAfter(delayedUntil); + } + + @Override + protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { + clear(context); + } + + protected Instant computeTargetTimestamp(Instant time) { + Instant result = time; + for (SerializableFunction timestampMapper : timestampMappers) { + result = timestampMapper.apply(result); + } + return result; + } + + /** + * A {@link SerializableFunction} to delay the timestamp at which this triggers fires. + */ + private static final class DelayFn implements SerializableFunction { + private final Duration delay; + + public DelayFn(Duration delay) { + this.delay = delay; + } + + @Override + public Instant apply(Instant input) { + return input.plus(delay); + } + + @Override + public boolean equals(Object object) { + if (object == this) { + return true; + } + + if (!(object instanceof DelayFn)) { + return false; + } + + return this.delay.equals(((DelayFn) object).delay); + } + + @Override + public int hashCode() { + return Objects.hash(delay); + } + + @Override + public String toString() { + return PERIOD_FORMATTER.print(delay.toPeriod()); + } + } + + /** + * A {@link SerializableFunction} to align an instant to the nearest interval boundary. + */ + static final class AlignFn implements SerializableFunction { + private final Duration size; + private final Instant offset; + + + /** + * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater + * than the timestamp. + */ + public AlignFn(Duration size, Instant offset) { + this.size = size; + this.offset = offset; + } + + @Override + public Instant apply(Instant point) { + long millisSinceStart = new Duration(offset, point).getMillis() % size.getMillis(); + return millisSinceStart == 0 ? point : point.plus(size).minus(millisSinceStart); + } + + @Override + public boolean equals(Object object) { + if (object == this) { + return true; + } + + if (!(object instanceof AlignFn)) { + return false; + } + + AlignFn other = (AlignFn) object; + return other.size.equals(this.size) + && other.offset.equals(this.offset); + } + + @Override + public int hashCode() { + return Objects.hash(size, offset); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java new file mode 100644 index 0000000..629c640 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.joda.time.Instant; + +/** + * A composite {@link Trigger} that executes its sub-triggers in order. + * Only one sub-trigger is executing at a time, + * and any time it fires the {@code AfterEach} fires. When the currently executing + * sub-trigger finishes, the {@code AfterEach} starts executing the next sub-trigger. + * + *

{@code AfterEach.inOrder(t1, t2, ...)} finishes when all of the sub-triggers have finished. + * + *

The following properties hold: + *

    + *
  • {@code AfterEach.inOrder(AfterEach.inOrder(a, b), c)} behaves the same as + * {@code AfterEach.inOrder(a, b, c)} and {@code AfterEach.inOrder(a, AfterEach.inOrder(b, c)}. + *
  • {@code AfterEach.inOrder(Repeatedly.forever(a), b)} behaves the same as + * {@code Repeatedly.forever(a)}, since the repeated trigger never finishes. + *
+ */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterEach extends Trigger { + + private AfterEach(List subTriggers) { + super(subTriggers); + checkArgument(subTriggers.size() > 1); + } + + /** + * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. + */ + @SafeVarargs + public static Trigger inOrder(Trigger... triggers) { + return new AfterEach(Arrays.asList(triggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + if (!c.trigger().isMerging()) { + // If merges are not possible, we need only run the first unfinished subtrigger + c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); + } else { + // If merges are possible, we need to run all subtriggers in parallel + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + // Even if the subTrigger is done, it may be revived via merging and must have + // adequate state. + subTrigger.invokeOnElement(c); + } + } + } + + @Override + public void onMerge(OnMergeContext context) throws Exception { + // If merging makes a subtrigger no-longer-finished, it will automatically + // begin participating in shouldFire and onFire appropriately. + + // All the following triggers are retroactively "not started" but that is + // also automatic because they are cleared whenever this trigger + // fires. + boolean priorTriggersAllFinished = true; + for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { + if (priorTriggersAllFinished) { + subTrigger.invokeOnMerge(context); + priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished(); + } else { + subTrigger.invokeClear(context); + } + } + updateFinishedState(context); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // This trigger will fire at least once when the first trigger in the sequence + // fires at least once. + return subTriggers.get(0).getWatermarkThatGuaranteesFiring(window); + } + + @Override + public Trigger getContinuationTrigger(List continuationTriggers) { + return Repeatedly.forever(new AfterFirst(continuationTriggers)); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger(); + return firstUnfinished.invokeShouldFire(context); + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { + context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context); + + // Reset all subtriggers if in a merging context; any may be revived by merging so they are + // all run in parallel for each pending pane. + if (context.trigger().isMerging()) { + for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { + subTrigger.invokeClear(context); + } + } + + updateFinishedState(context); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterEach.inOrder("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + + private void updateFinishedState(TriggerContext context) { + context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java new file mode 100644 index 0000000..6b06cfa --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.joda.time.Instant; + +/** + * Create a composite {@link Trigger} that fires once after at least one of its sub-triggers have + * fired. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterFirst extends OnceTrigger { + + AfterFirst(List subTriggers) { + super(subTriggers); + checkArgument(subTriggers.size() > 1); + } + + /** + * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. + */ + public static OnceTrigger of(OnceTrigger... triggers) { + return new AfterFirst(Arrays.asList(triggers)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnElement(c); + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnMerge(c); + } + updateFinishedStatus(c); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // This trigger will fire after the earliest of its sub-triggers. + Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE; + for (Trigger subTrigger : subTriggers) { + Instant subDeadline = subTrigger.getWatermarkThatGuaranteesFiring(window); + if (deadline.isAfter(subDeadline)) { + deadline = subDeadline; + } + } + return deadline; + } + + @Override + public OnceTrigger getContinuationTrigger(List continuationTriggers) { + return new AfterFirst(continuationTriggers); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + if (context.forTrigger(subtrigger).trigger().isFinished() + || subtrigger.invokeShouldFire(context)) { + return true; + } + } + return false; + } + + @Override + protected void onOnlyFiring(TriggerContext context) throws Exception { + for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { + TriggerContext subContext = context.forTrigger(subtrigger); + if (subtrigger.invokeShouldFire(subContext)) { + // If the trigger is ready to fire, then do whatever it needs to do. + subtrigger.invokeOnFire(subContext); + } else { + // If the trigger is not ready to fire, it is nonetheless true that whatever + // pending pane it was tracking is now gone. + subtrigger.invokeClear(subContext); + } + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterFirst.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + + private void updateFinishedStatus(TriggerContext c) { + boolean anyFinished = false; + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + anyFinished |= c.forTrigger(subTrigger).trigger().isFinished(); + } + c.trigger().setFinished(anyFinished); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java new file mode 100644 index 0000000..8c128dd --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.joda.time.Instant; + +/** + * {@link Trigger}s that fire based on properties of the elements in the current pane. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterPane extends OnceTrigger { + +private static final StateTag> + ELEMENTS_IN_PANE_TAG = + StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( + "count", VarLongCoder.of(), new Sum.SumLongFn())); + + private final int countElems; + + private AfterPane(int countElems) { + super(null); + this.countElems = countElems; + } + + /** + * Creates a trigger that fires when the pane contains at least {@code countElems} elements. + */ + public static AfterPane elementCountAtLeast(int countElems) { + return new AfterPane(countElems); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + c.state().access(ELEMENTS_IN_PANE_TAG).add(1L); + } + + @Override + public void prefetchOnMerge(MergingStateAccessor state) { + super.prefetchOnMerge(state); + StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG); + } + + @Override + public void onMerge(OnMergeContext context) throws Exception { + // If we've already received enough elements and finished in some window, + // then this trigger is just finished. + if (context.trigger().finishedInAnyMergingWindow()) { + context.trigger().setFinished(true); + StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG); + return; + } + + // Otherwise, compute the sum of elements in all the active panes. + StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG); + } + + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = + "prefetch side effect") + public void prefetchShouldFire(StateAccessor state) { + state.access(ELEMENTS_IN_PANE_TAG).readLater(); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + long count = context.state().access(ELEMENTS_IN_PANE_TAG).read(); + return count >= countElems; + } + + @Override + public void clear(TriggerContext c) throws Exception { + c.state().access(ELEMENTS_IN_PANE_TAG).clear(); + } + + @Override + public boolean isCompatible(Trigger other) { + return this.equals(other); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public OnceTrigger getContinuationTrigger(List continuationTriggers) { + return AfterPane.elementCountAtLeast(1); + } + + @Override + public String toString() { + return "AfterPane.elementCountAtLeast(" + countElems + ")"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof AfterPane)) { + return false; + } + AfterPane that = (AfterPane) obj; + return this.countElems == that.countElems; + } + + @Override + public int hashCode() { + return Objects.hash(countElems); + } + + @Override + protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { + clear(context); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java new file mode 100644 index 0000000..f551118 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +/** + * {@code AfterProcessingTime} triggers fire based on the current processing time. They operate in + * the real-time domain. + * + *

The time at which to fire the timer can be adjusted via the methods in + * {@link AfterDelayFromFirstElement}, such as {@link AfterDelayFromFirstElement#plusDelayOf} or + * {@link AfterDelayFromFirstElement#alignedTo}. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterProcessingTime extends AfterDelayFromFirstElement { + + @Override + @Nullable + public Instant getCurrentTime(Trigger.TriggerContext context) { + return context.currentProcessingTime(); + } + + private AfterProcessingTime(List> transforms) { + super(TimeDomain.PROCESSING_TIME, transforms); + } + + /** + * Creates a trigger that fires when the current processing time passes the processing time + * at which this trigger saw the first element in a pane. + */ + public static AfterProcessingTime pastFirstElementInPane() { + return new AfterProcessingTime(IDENTITY); + } + + @Override + protected AfterProcessingTime newWith( + List> transforms) { + return new AfterProcessingTime(transforms); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + protected Trigger getContinuationTrigger(List continuationTriggers) { + return new AfterSynchronizedProcessingTime(); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()"); + for (SerializableFunction delayFn : timestampMappers) { + builder + .append(".plusDelayOf(") + .append(delayFn) + .append(")"); + } + + return builder.toString(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof AfterProcessingTime)) { + return false; + } + AfterProcessingTime that = (AfterProcessingTime) obj; + return Objects.equals(this.timestampMappers, that.timestampMappers); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), this.timestampMappers); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java new file mode 100644 index 0000000..59ece10 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.base.Objects; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { + + @Override + @Nullable + public Instant getCurrentTime(Trigger.TriggerContext context) { + return context.currentSynchronizedProcessingTime(); + } + + public AfterSynchronizedProcessingTime() { + super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + Collections.>emptyList()); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + protected Trigger getContinuationTrigger(List continuationTriggers) { + return this; + } + + @Override + public String toString() { + return "AfterSynchronizedProcessingTime.pastFirstElementInPane()"; + } + + @Override + public boolean equals(Object obj) { + return this == obj || obj instanceof AfterSynchronizedProcessingTime; + } + + @Override + public int hashCode() { + return Objects.hashCode(AfterSynchronizedProcessingTime.class); + } + + @Override + protected AfterSynchronizedProcessingTime + newWith(List> transforms) { + // ignore transforms + return this; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java new file mode 100644 index 0000000..e2463d8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +/** + * {@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a + * lower-bound, sometimes heuristically established, on event times that have been fully processed + * by the pipeline. + * + *

For sources that provide non-heuristic watermarks (e.g. + * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the + * watermark is a strict guarantee that no data with an event time earlier than + * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any + * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end + * of the window will be the last pane ever for that window. + * + *

For sources that provide heuristic watermarks (e.g. + * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the + * watermark itself becomes an estimate that no data with an event time earlier than that + * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can + * often be quite accurate, but the chance of seeing late data for any given window is non-zero. + * Thus, if absolute correctness over time is important to your use case, you may want to consider + * using a trigger that accounts for late data. The default trigger, + * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires + * once when the watermark passes the end of the window and then immediately therafter when any + * late data arrives, is one such example. + * + *

The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}. + * + *

Additionaly firings before or after the watermark can be requested by calling + * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or + * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class AfterWatermark { + + private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()"; + + // Static factory class. + private AfterWatermark() {} + + /** + * Creates a trigger that fires when the watermark passes the end of the window. + */ + public static FromEndOfWindow pastEndOfWindow() { + return new FromEndOfWindow(); + } + + /** + * @see AfterWatermark + */ + public static class AfterWatermarkEarlyAndLate extends Trigger { + + private static final int EARLY_INDEX = 0; + private static final int LATE_INDEX = 1; + + private final OnceTrigger earlyTrigger; + @Nullable + private final OnceTrigger lateTrigger; + + @SuppressWarnings("unchecked") + public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { + super(lateTrigger == null + ? ImmutableList.of(earlyTrigger) + : ImmutableList.of(earlyTrigger, lateTrigger)); + this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null"); + this.lateTrigger = lateTrigger; + } + + public Trigger withEarlyFirings(OnceTrigger earlyTrigger) { + return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); + } + + public Trigger withLateFirings(OnceTrigger lateTrigger) { + return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + if (!c.trigger().isMerging()) { + // If merges can never happen, we just run the unfinished subtrigger + c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); + } else { + // If merges can happen, we run for all subtriggers because they might be + // de-activated or re-activated + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnElement(c); + } + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // NOTE that the ReduceFnRunner will delete all end-of-window timers for the + // merged-away windows. + + ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX); + // We check the early trigger to determine if we are still processing it or + // if the end of window has transitioned us to the late trigger + OnMergeContext earlyContext = c.forTrigger(earlySubtrigger); + + // If the early trigger is still active in any merging window then it is still active in + // the new merged window, because even if the merged window is "done" some pending elements + // haven't had a chance to fire. + if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { + earlyContext.trigger().setFinished(false); + if (lateTrigger != null) { + ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); + OnMergeContext lateContext = c.forTrigger(lateSubtrigger); + lateContext.trigger().setFinished(false); + lateSubtrigger.invokeClear(lateContext); + } + } else { + // Otherwise the early trigger and end-of-window bit is done for good. + earlyContext.trigger().setFinished(true); + if (lateTrigger != null) { + c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c); + } + } + } + + @Override + public Trigger getContinuationTrigger() { + return new AfterWatermarkEarlyAndLate( + earlyTrigger.getContinuationTrigger(), + lateTrigger == null ? null : lateTrigger.getContinuationTrigger()); + } + + @Override + protected Trigger getContinuationTrigger(List continuationTriggers) { + throw new UnsupportedOperationException( + "Should not call getContinuationTrigger(List)"); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // Even without an early or late trigger, we'll still produce a firing at the watermark. + return window.maxTimestamp(); + } + + private boolean endOfWindowReached(Trigger.TriggerContext context) { + return context.currentEventTime() != null + && context.currentEventTime().isAfter(context.window().maxTimestamp()); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + if (!context.trigger().isFinished(EARLY_INDEX)) { + // We have not yet transitioned to late firings. + // We should fire if either the trigger is ready or we reach the end of the window. + return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context) + || endOfWindowReached(context); + } else if (lateTrigger == null) { + return false; + } else { + // We are running the late trigger + return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context); + } + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { + if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) { + onNonLateFiring(context); + } else if (lateTrigger != null) { + onLateFiring(context); + } else { + // all done + context.trigger().setFinished(true); + } + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(TO_STRING); + + if (!(earlyTrigger instanceof Never.NeverTrigger)) { + builder + .append(".withEarlyFirings(") + .append(earlyTrigger) + .append(")"); + } + + if (lateTrigger != null && !(lateTrigger instanceof Never.NeverTrigger)) { + builder + .append(".withLateFirings(") + .append(lateTrigger) + .append(")"); + } + + return builder.toString(); + } + + private void onNonLateFiring(Trigger.TriggerContext context) throws Exception { + // We have not yet transitioned to late firings. + ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); + Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger); + + if (!endOfWindowReached(context)) { + // This is an early firing, since we have not arrived at the end of the window + // Implicitly repeats + earlySubtrigger.invokeOnFire(context); + earlySubtrigger.invokeClear(context); + earlyContext.trigger().setFinished(false); + } else { + // We have arrived at the end of the window; terminate the early trigger + // and clear out the late trigger's state + if (earlySubtrigger.invokeShouldFire(context)) { + earlySubtrigger.invokeOnFire(context); + } + earlyContext.trigger().setFinished(true); + earlySubtrigger.invokeClear(context); + + if (lateTrigger == null) { + // Done if there is no late trigger. + context.trigger().setFinished(true); + } else { + // If there is a late trigger, we transition to it, and need to clear its state + // because it was run in parallel. + context.trigger().subTrigger(LATE_INDEX).invokeClear(context); + } + } + + } + + private void onLateFiring(Trigger.TriggerContext context) throws Exception { + // We are firing the late trigger, with implicit repeat + ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX); + lateSubtrigger.invokeOnFire(context); + // It is a OnceTrigger, so it must have finished; unfinished it and clear it + lateSubtrigger.invokeClear(context); + context.forTrigger(lateSubtrigger).trigger().setFinished(false); + } + } + + /** + * A watermark trigger targeted relative to the end of the window. + */ + public static class FromEndOfWindow extends OnceTrigger { + + private FromEndOfWindow() { + super(null); + } + + /** + * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever + * the given {@code Trigger} fires before the watermark has passed the end of the window. + */ + public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyFirings) { + checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); + return new AfterWatermarkEarlyAndLate(earlyFirings, null); + } + + /** + * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever + * the given {@code Trigger} fires after the watermark has passed the end of the window. + */ + public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateFirings) { + checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); + return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + // We're interested in knowing when the input watermark passes the end of the window. + // (It is possible this has already happened, in which case the timer will be fired + // almost immediately). + c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // NOTE that the ReduceFnRunner will delete all end-of-window timers for the + // merged-away windows. + + if (!c.trigger().finishedInAllMergingWindows()) { + // If the trigger is still active in any merging window then it is still active in the new + // merged window, because even if the merged window is "done" some pending elements haven't + // had a chance to fire + c.trigger().setFinished(false); + } else if (!endOfWindowReached(c)) { + // If the end of the new window has not been reached, then the trigger is active again. + c.trigger().setFinished(false); + } else { + // Otherwise it is done for good + c.trigger().setFinished(true); + } + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return window.maxTimestamp(); + } + + @Override + public FromEndOfWindow getContinuationTrigger(List continuationTriggers) { + return this; + } + + @Override + public String toString() { + return TO_STRING; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof FromEndOfWindow; + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return endOfWindowReached(context); + } + + private boolean endOfWindowReached(Trigger.TriggerContext context) { + return context.currentEventTime() != null + && context.currentEventTime().isAfter(context.window().maxTimestamp()); + } + + @Override + protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java new file mode 100644 index 0000000..d6b72ef --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +/** + * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}. + * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details. + */ +@Experimental(Experimental.Kind.TRIGGER) +public class DefaultTrigger extends Trigger{ + + private DefaultTrigger() { + super(null); + } + + /** + * Returns the default trigger. + */ + public static DefaultTrigger of() { + return new DefaultTrigger(); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + // If the end of the window has already been reached, then we are already ready to fire + // and do not need to set a wake-up timer. + if (!endOfWindowReached(c)) { + c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); + } + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + // If the end of the window has already been reached, then we are already ready to fire + // and do not need to set a wake-up timer. + if (!endOfWindowReached(c)) { + c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); + } + } + + @Override + public void clear(TriggerContext c) throws Exception { } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return window.maxTimestamp(); + } + + @Override + public boolean isCompatible(Trigger other) { + // Semantically, all default triggers are identical + return other instanceof DefaultTrigger; + } + + @Override + public Trigger getContinuationTrigger(List continuationTriggers) { + return this; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return endOfWindowReached(context); + } + + private boolean endOfWindowReached(Trigger.TriggerContext context) { + return context.currentEventTime() != null + && context.currentEventTime().isAfter(context.window().maxTimestamp()); + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java new file mode 100644 index 0000000..5f20465 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import java.util.List; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.joda.time.Instant; + +/** + * A trigger which never fires. + * + *

Using this trigger will only produce output when the watermark passes the end of the + * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed + * lateness}. + */ +public final class Never { + /** + * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} + * when the {@link BoundedWindow} closes. + */ + public static OnceTrigger ever() { + // NeverTrigger ignores all inputs and is Window-type independent. + return new NeverTrigger(); + } + + // package-private in order to check identity for string formatting. + static class NeverTrigger extends OnceTrigger { + protected NeverTrigger() { + super(null); + } + + @Override + public void onElement(OnElementContext c) {} + + @Override + public void onMerge(OnMergeContext c) {} + + @Override + protected Trigger getContinuationTrigger(List continuationTriggers) { + return this; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) { + return false; + } + + @Override + protected void onOnlyFiring(Trigger.TriggerContext context) { + throw new UnsupportedOperationException( + String.format("%s should never fire", getClass().getSimpleName())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java new file mode 100644 index 0000000..25b7b34 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.joda.time.Instant; + +/** + * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires. + */ +class OrFinallyTrigger extends Trigger { + + private static final int ACTUAL = 0; + private static final int UNTIL = 1; + + @VisibleForTesting OrFinallyTrigger(Trigger actual, Trigger.OnceTrigger until) { + super(Arrays.asList(actual, until)); + } + + @Override + public void onElement(OnElementContext c) throws Exception { + c.trigger().subTrigger(ACTUAL).invokeOnElement(c); + c.trigger().subTrigger(UNTIL).invokeOnElement(c); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + subTrigger.invokeOnMerge(c); + } + updateFinishedState(c); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // This trigger fires once either the trigger or the until trigger fires. + Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window); + Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window); + return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline; + } + + @Override + public Trigger getContinuationTrigger(List continuationTriggers) { + // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL + // may not be a OnceTrigger. + return Repeatedly.forever( + new OrFinallyTrigger( + continuationTriggers.get(ACTUAL), + (Trigger.OnceTrigger) continuationTriggers.get(UNTIL))); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context) + || context.trigger().subTrigger(UNTIL).invokeShouldFire(context); + } + + @Override + public void onFire(Trigger.TriggerContext context) throws Exception { + ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL); + ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL); + + if (untilSubtrigger.invokeShouldFire(context)) { + untilSubtrigger.invokeOnFire(context); + actualSubtrigger.invokeClear(context); + } else { + // If until didn't fire, then the actual must have (or it is forbidden to call + // onFire) so we are done only if actual is done. + actualSubtrigger.invokeOnFire(context); + // Do not clear the until trigger, because it tracks data cross firings. + } + updateFinishedState(context); + } + + @Override + public String toString() { + return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL)); + } + + private void updateFinishedState(TriggerContext c) throws Exception { + boolean anyStillFinished = false; + for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { + anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished(); + } + c.trigger().setFinished(anyStillFinished); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64acb2f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java new file mode 100644 index 0000000..8858798 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.joda.time.Instant; + +/** + * Repeat a trigger, either until some condition is met or forever. + * + *

For example, to fire after the end of the window, and every time late data arrives: + *

 {@code
+ *     Repeatedly.forever(AfterWatermark.isPastEndOfWindow());
+ * } 
+ * + *

{@code Repeatedly.forever(someTrigger)} behaves like an infinite + * {@code AfterEach.inOrder(someTrigger, someTrigger, someTrigger, ...)}. + */ +public class Repeatedly extends Trigger { + + private static final int REPEATED = 0; + + /** + * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each + * time it fires and ignoring any indications to finish. + * + *

Unless used with {@link Trigger#orFinally} the composite trigger will never finish. + * + * @param repeated the trigger to execute repeatedly. + */ + public static Repeatedly forever(Trigger repeated) { + return new Repeatedly(repeated); + } + + private Repeatedly(Trigger repeated) { + super(Arrays.asList(repeated)); + } + + + @Override + public void onElement(OnElementContext c) throws Exception { + getRepeated(c).invokeOnElement(c); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + getRepeated(c).invokeOnMerge(c); + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + // This trigger fires once the repeated trigger fires. + return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window); + } + + @Override + public Trigger getContinuationTrigger(List continuationTriggers) { + return new Repeatedly(continuationTriggers.get(REPEATED)); + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) throws Exception { + return getRepeated(context).invokeShouldFire(context); + } + + @Override + public void onFire(TriggerContext context) throws Exception { + getRepeated(context).invokeOnFire(context); + + if (context.trigger().isFinished(REPEATED)) { + // Reset tree will recursively clear the finished bits, and invoke clear. + context.forTrigger(getRepeated(context)).trigger().resetTree(); + } + } + + @Override + public String toString() { + return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); + } + + private ExecutableTrigger getRepeated(TriggerContext context) { + return context.trigger().subTrigger(REPEATED); + } +}