Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1FD4D200BBB for ; Wed, 26 Oct 2016 18:44:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1E7D2160B02; Wed, 26 Oct 2016 16:44:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CA0B8160AE1 for ; Wed, 26 Oct 2016 18:43:57 +0200 (CEST) Received: (qmail 87400 invoked by uid 500); 26 Oct 2016 16:43:57 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 87389 invoked by uid 99); 26 Oct 2016 16:43:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2016 16:43:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7026D1806F8 for ; Wed, 26 Oct 2016 16:43:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id nbuRfuV-MBZK for ; Wed, 26 Oct 2016 16:43:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 0D6545FC84 for ; Wed, 26 Oct 2016 16:43:41 +0000 (UTC) Received: (qmail 82226 invoked by uid 99); 26 Oct 2016 16:43:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2016 16:43:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5E1E9E97DD; Wed, 26 Oct 2016 16:43:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Wed, 26 Oct 2016 16:44:07 -0000 Message-Id: <2b5581a1f88f423283ec949eb3a730e3@git.apache.org> In-Reply-To: <35cd756b7a90424cbecde858fc5896b0@git.apache.org> References: <35cd756b7a90424cbecde858fc5896b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] incubator-beam git commit: Remove pieces of Trigger now owned by TriggerStateMachine archived-at: Wed, 26 Oct 2016 16:44:00 -0000 Remove pieces of Trigger now owned by TriggerStateMachine Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1eff320d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1eff320d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1eff320d Branch: refs/heads/gearpump-runner Commit: 1eff320d5e7fb5510d13016e0826b14e5cf7f686 Parents: dfaf2a8 Author: Kenneth Knowles Authored: Mon Oct 24 12:57:37 2016 -0700 Committer: Kenneth Knowles Committed: Tue Oct 25 10:12:56 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/windowing/AfterAll.java | 49 -- .../windowing/AfterDelayFromFirstElement.java | 99 ---- .../sdk/transforms/windowing/AfterEach.java | 61 --- .../sdk/transforms/windowing/AfterFirst.java | 50 -- .../sdk/transforms/windowing/AfterPane.java | 52 -- .../windowing/AfterProcessingTime.java | 7 - .../AfterSynchronizedProcessingTime.java | 7 - .../transforms/windowing/AfterWatermark.java | 158 ------ .../transforms/windowing/DefaultTrigger.java | 35 -- .../beam/sdk/transforms/windowing/Never.java | 17 - .../transforms/windowing/OrFinallyTrigger.java | 46 -- .../sdk/transforms/windowing/Repeatedly.java | 30 -- .../beam/sdk/transforms/windowing/Trigger.java | 412 ++------------- .../apache/beam/sdk/util/ExecutableTrigger.java | 40 +- .../apache/beam/sdk/util/FinishedTriggers.java | 44 -- .../beam/sdk/util/FinishedTriggersBitSet.java | 67 --- .../beam/sdk/util/FinishedTriggersSet.java | 72 --- .../apache/beam/sdk/util/ReshuffleTrigger.java | 14 - .../beam/sdk/util/TriggerContextFactory.java | 507 ------------------- .../sdk/transforms/windowing/AfterAllTest.java | 98 ---- .../sdk/transforms/windowing/AfterEachTest.java | 64 --- .../transforms/windowing/AfterFirstTest.java | 120 ----- .../sdk/transforms/windowing/AfterPaneTest.java | 77 --- .../windowing/AfterProcessingTimeTest.java | 94 ---- .../AfterSynchronizedProcessingTimeTest.java | 75 --- .../windowing/AfterWatermarkTest.java | 308 ----------- .../windowing/DefaultTriggerTest.java | 130 ----- .../sdk/transforms/windowing/NeverTest.java | 34 +- .../windowing/OrFinallyTriggerTest.java | 136 ----- .../transforms/windowing/RepeatedlyTest.java | 161 +----- .../sdk/transforms/windowing/StubTrigger.java | 17 - .../sdk/transforms/windowing/TriggerTest.java | 28 - .../beam/sdk/util/ExecutableTriggerTest.java | 18 - .../sdk/util/FinishedTriggersBitSetTest.java | 55 -- .../sdk/util/FinishedTriggersProperties.java | 110 ---- .../beam/sdk/util/FinishedTriggersSetTest.java | 60 --- .../beam/sdk/util/ReshuffleTriggerTest.java | 23 - .../org/apache/beam/sdk/util/TriggerTester.java | 410 --------------- 38 files changed, 77 insertions(+), 3708 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index 0e37d33..c3f0848 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.ExecutableTrigger; import org.joda.time.Instant; /** @@ -46,27 +45,6 @@ public class AfterAll extends OnceTrigger { } @Override - public void onElement(OnElementContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().unfinishedSubTriggers()) { - // Since subTriggers are all OnceTriggers, they must either CONTINUE or FIRE_AND_FINISH. - // invokeElement will automatically mark the finish bit if they return FIRE_AND_FINISH. - subTrigger.invokeOnElement(c); - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnMerge(c); - } - boolean allFinished = true; - for (ExecutableTrigger subTrigger1 : c.trigger().subTriggers()) { - allFinished &= c.forTrigger(subTrigger1).trigger().isFinished(); - } - c.trigger().setFinished(allFinished); - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the latest of its sub-triggers. Instant deadline = BoundedWindow.TIMESTAMP_MIN_VALUE; @@ -84,33 +62,6 @@ public class AfterAll extends OnceTrigger { return new AfterAll(continuationTriggers); } - /** - * {@inheritDoc} - * - * @return {@code true} if all subtriggers return {@code true}. - */ - @Override - public boolean shouldFire(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - if (!context.forTrigger(subtrigger).trigger().isFinished() - && !subtrigger.invokeShouldFire(context)) { - return false; - } - } - return true; - } - - /** - * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire} - * because they all must be ready to fire. - */ - @Override - public void onOnlyFiring(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - subtrigger.invokeOnFire(context); - } - } - @Override public String toString() { StringBuilder builder = new StringBuilder("AfterAll.of("); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java index 6078b34..9daecb2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java @@ -18,11 +18,9 @@ package org.apache.beam.sdk.transforms.windowing; import com.google.common.collect.ImmutableList; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Locale; import java.util.Objects; -import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.transforms.Combine; @@ -31,10 +29,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.joda.time.Duration; @@ -62,12 +56,6 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger { private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); /** - * To complete an implementation, return the desired time from the TriggerContext. - */ - @Nullable - public abstract Instant getCurrentTime(Trigger.TriggerContext context); - - /** * To complete an implementation, return a new instance like this one, but incorporating * the provided timestamp mapping functions. Generally should be used by calling the * constructor of this class from the constructor of the subclass. @@ -92,10 +80,6 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger { this.timeDomain = timeDomain; } - private Instant getTargetTimestamp(OnElementContext c) { - return computeTargetTimestamp(c.currentProcessingTime()); - } - /** * The time domain according for which this trigger sets timers. */ @@ -170,93 +154,10 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger { } @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = - "prefetch side effect") - public void prefetchOnElement(StateAccessor state) { - state.access(DELAYED_UNTIL_TAG).readLater(); - } - - @Override - public void onElement(OnElementContext c) throws Exception { - CombiningState delayUntilState = c.state().access(DELAYED_UNTIL_TAG); - Instant oldDelayUntil = delayUntilState.read(); - - // Since processing time can only advance, resulting in target wake-up times we would - // ignore anyhow, we don't bother with it if it is already set. - if (oldDelayUntil != null) { - return; - } - - Instant targetTimestamp = getTargetTimestamp(c); - delayUntilState.add(targetTimestamp); - c.setTimer(targetTimestamp, timeDomain); - } - - @Override - public void prefetchOnMerge(MergingStateAccessor state) { - super.prefetchOnMerge(state); - StateMerging.prefetchCombiningValues(state, DELAYED_UNTIL_TAG); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // NOTE: We could try to delete all timers which are still active, but we would - // need access to a timer context for each merging window. - // for (CombiningValueStateInternal, Instant> state : - // c.state().accessInEachMergingWindow(DELAYED_UNTIL_TAG).values()) { - // Instant timestamp = state.get().read(); - // if (timestamp != null) { - // .deleteTimer(timestamp, timeDomain); - // } - // } - // Instead let them fire and be ignored. - - // If the trigger is already finished, there is no way it will become re-activated - if (c.trigger().isFinished()) { - StateMerging.clear(c.state(), DELAYED_UNTIL_TAG); - // NOTE: We do not attempt to delete the timers. - return; - } - - // Determine the earliest point across all the windows, and delay to that. - StateMerging.mergeCombiningValues(c.state(), DELAYED_UNTIL_TAG); - - Instant earliestTargetTime = c.state().access(DELAYED_UNTIL_TAG).read(); - if (earliestTargetTime != null) { - c.setTimer(earliestTargetTime, timeDomain); - } - } - - @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = - "prefetch side effect") - public void prefetchShouldFire(StateAccessor state) { - state.access(DELAYED_UNTIL_TAG).readLater(); - } - - @Override - public void clear(TriggerContext c) throws Exception { - c.state().access(DELAYED_UNTIL_TAG).clear(); - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read(); - return delayedUntil != null - && getCurrentTime(context) != null - && getCurrentTime(context).isAfter(delayedUntil); - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { - clear(context); - } - protected Instant computeTargetTimestamp(Instant time) { Instant result = time; for (SerializableFunction timestampMapper : timestampMappers) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 961d97f..872ad46 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -23,7 +23,6 @@ import com.google.common.base.Joiner; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.ExecutableTrigger; import org.joda.time.Instant; /** @@ -59,41 +58,6 @@ public class AfterEach extends Trigger { } @Override - public void onElement(OnElementContext c) throws Exception { - if (!c.trigger().isMerging()) { - // If merges are not possible, we need only run the first unfinished subtrigger - c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); - } else { - // If merges are possible, we need to run all subtriggers in parallel - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - // Even if the subTrigger is done, it may be revived via merging and must have - // adequate state. - subTrigger.invokeOnElement(c); - } - } - } - - @Override - public void onMerge(OnMergeContext context) throws Exception { - // If merging makes a subtrigger no-longer-finished, it will automatically - // begin participating in shouldFire and onFire appropriately. - - // All the following triggers are retroactively "not started" but that is - // also automatic because they are cleared whenever this trigger - // fires. - boolean priorTriggersAllFinished = true; - for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { - if (priorTriggersAllFinished) { - subTrigger.invokeOnMerge(context); - priorTriggersAllFinished &= context.forTrigger(subTrigger).trigger().isFinished(); - } else { - subTrigger.invokeClear(context); - } - } - updateFinishedState(context); - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire at least once when the first trigger in the sequence // fires at least once. @@ -106,27 +70,6 @@ public class AfterEach extends Trigger { } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - ExecutableTrigger firstUnfinished = context.trigger().firstUnfinishedSubTrigger(); - return firstUnfinished.invokeShouldFire(context); - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { - context.trigger().firstUnfinishedSubTrigger().invokeOnFire(context); - - // Reset all subtriggers if in a merging context; any may be revived by merging so they are - // all run in parallel for each pending pane. - if (context.trigger().isMerging()) { - for (ExecutableTrigger subTrigger : context.trigger().subTriggers()) { - subTrigger.invokeClear(context); - } - } - - updateFinishedState(context); - } - - @Override public String toString() { StringBuilder builder = new StringBuilder("AfterEach.inOrder("); Joiner.on(", ").appendTo(builder, subTriggers); @@ -134,8 +77,4 @@ public class AfterEach extends Trigger { return builder.toString(); } - - private void updateFinishedState(TriggerContext context) { - context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index 7840fc4..a742b43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.ExecutableTrigger; import org.joda.time.Instant; /** @@ -47,21 +46,6 @@ public class AfterFirst extends OnceTrigger { } @Override - public void onElement(OnElementContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnElement(c); - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnMerge(c); - } - updateFinishedStatus(c); - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger will fire after the earliest of its sub-triggers. Instant deadline = BoundedWindow.TIMESTAMP_MAX_VALUE; @@ -80,32 +64,6 @@ public class AfterFirst extends OnceTrigger { } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - if (context.forTrigger(subtrigger).trigger().isFinished() - || subtrigger.invokeShouldFire(context)) { - return true; - } - } - return false; - } - - @Override - protected void onOnlyFiring(TriggerContext context) throws Exception { - for (ExecutableTrigger subtrigger : context.trigger().subTriggers()) { - TriggerContext subContext = context.forTrigger(subtrigger); - if (subtrigger.invokeShouldFire(subContext)) { - // If the trigger is ready to fire, then do whatever it needs to do. - subtrigger.invokeOnFire(subContext); - } else { - // If the trigger is not ready to fire, it is nonetheless true that whatever - // pending pane it was tracking is now gone. - subtrigger.invokeClear(subContext); - } - } - } - - @Override public String toString() { StringBuilder builder = new StringBuilder("AfterFirst.of("); Joiner.on(", ").appendTo(builder, subTriggers); @@ -113,12 +71,4 @@ public class AfterFirst extends OnceTrigger { return builder.toString(); } - - private void updateFinishedStatus(TriggerContext c) { - boolean anyFinished = false; - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - anyFinished |= c.forTrigger(subTrigger).trigger().isFinished(); - } - c.trigger().setFinished(anyFinished); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java index 4d59d58..4a706e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.transforms.windowing; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.List; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; @@ -25,9 +24,6 @@ import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.joda.time.Instant; @@ -65,49 +61,6 @@ private static final StateTag state) { - super.prefetchOnMerge(state); - StateMerging.prefetchCombiningValues(state, ELEMENTS_IN_PANE_TAG); - } - - @Override - public void onMerge(OnMergeContext context) throws Exception { - // If we've already received enough elements and finished in some window, - // then this trigger is just finished. - if (context.trigger().finishedInAnyMergingWindow()) { - context.trigger().setFinished(true); - StateMerging.clear(context.state(), ELEMENTS_IN_PANE_TAG); - return; - } - - // Otherwise, compute the sum of elements in all the active panes. - StateMerging.mergeCombiningValues(context.state(), ELEMENTS_IN_PANE_TAG); - } - - @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", justification = - "prefetch side effect") - public void prefetchShouldFire(StateAccessor state) { - state.access(ELEMENTS_IN_PANE_TAG).readLater(); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - long count = context.state().access(ELEMENTS_IN_PANE_TAG).read(); - return count >= countElems; - } - - @Override - public void clear(TriggerContext c) throws Exception { - c.state().access(ELEMENTS_IN_PANE_TAG).clear(); - } - - @Override public boolean isCompatible(Trigger other) { return this.equals(other); } @@ -143,9 +96,4 @@ private static final StateTag> transforms) { super(TimeDomain.PROCESSING_TIME, transforms); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index b96b293..b6258f8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing; import com.google.common.base.Objects; import java.util.Collections; import java.util.List; -import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; @@ -31,12 +30,6 @@ import org.joda.time.Instant; */ public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { - @Override - @Nullable - public Instant getCurrentTime(Trigger.TriggerContext context) { - return context.currentSynchronizedProcessingTime(); - } - public AfterSynchronizedProcessingTime() { super(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Collections.>emptyList()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 89c1ba9..37b73a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -25,7 +25,6 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.ExecutableTrigger; import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; @@ -111,50 +110,6 @@ public class AfterWatermark { } @Override - public void onElement(OnElementContext c) throws Exception { - if (!c.trigger().isMerging()) { - // If merges can never happen, we just run the unfinished subtrigger - c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c); - } else { - // If merges can happen, we run for all subtriggers because they might be - // de-activated or re-activated - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnElement(c); - } - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // NOTE that the ReduceFnRunner will delete all end-of-window timers for the - // merged-away windows. - - ExecutableTrigger earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX); - // We check the early trigger to determine if we are still processing it or - // if the end of window has transitioned us to the late trigger - OnMergeContext earlyContext = c.forTrigger(earlySubtrigger); - - // If the early trigger is still active in any merging window then it is still active in - // the new merged window, because even if the merged window is "done" some pending elements - // haven't had a chance to fire. - if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { - earlyContext.trigger().setFinished(false); - if (lateTrigger != null) { - ExecutableTrigger lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); - OnMergeContext lateContext = c.forTrigger(lateSubtrigger); - lateContext.trigger().setFinished(false); - lateSubtrigger.invokeClear(lateContext); - } - } else { - // Otherwise the early trigger and end-of-window bit is done for good. - earlyContext.trigger().setFinished(true); - if (lateTrigger != null) { - c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c); - } - } - } - - @Override public Trigger getContinuationTrigger() { return new AfterWatermarkEarlyAndLate( earlyTrigger.getContinuationTrigger(), @@ -173,38 +128,6 @@ public class AfterWatermark { return window.maxTimestamp(); } - private boolean endOfWindowReached(Trigger.TriggerContext context) { - return context.currentEventTime() != null - && context.currentEventTime().isAfter(context.window().maxTimestamp()); - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - if (!context.trigger().isFinished(EARLY_INDEX)) { - // We have not yet transitioned to late firings. - // We should fire if either the trigger is ready or we reach the end of the window. - return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context) - || endOfWindowReached(context); - } else if (lateTrigger == null) { - return false; - } else { - // We are running the late trigger - return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context); - } - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { - if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) { - onNonLateFiring(context); - } else if (lateTrigger != null) { - onLateFiring(context); - } else { - // all done - context.trigger().setFinished(true); - } - } - @Override public String toString() { StringBuilder builder = new StringBuilder(TO_STRING); @@ -225,47 +148,6 @@ public class AfterWatermark { return builder.toString(); } - - private void onNonLateFiring(Trigger.TriggerContext context) throws Exception { - // We have not yet transitioned to late firings. - ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); - Trigger.TriggerContext earlyContext = context.forTrigger(earlySubtrigger); - - if (!endOfWindowReached(context)) { - // This is an early firing, since we have not arrived at the end of the window - // Implicitly repeats - earlySubtrigger.invokeOnFire(context); - earlySubtrigger.invokeClear(context); - earlyContext.trigger().setFinished(false); - } else { - // We have arrived at the end of the window; terminate the early trigger - // and clear out the late trigger's state - if (earlySubtrigger.invokeShouldFire(context)) { - earlySubtrigger.invokeOnFire(context); - } - earlyContext.trigger().setFinished(true); - earlySubtrigger.invokeClear(context); - - if (lateTrigger == null) { - // Done if there is no late trigger. - context.trigger().setFinished(true); - } else { - // If there is a late trigger, we transition to it, and need to clear its state - // because it was run in parallel. - context.trigger().subTrigger(LATE_INDEX).invokeClear(context); - } - } - - } - - private void onLateFiring(Trigger.TriggerContext context) throws Exception { - // We are firing the late trigger, with implicit repeat - ExecutableTrigger lateSubtrigger = context.trigger().subTrigger(LATE_INDEX); - lateSubtrigger.invokeOnFire(context); - // It is a OnceTrigger, so it must have finished; unfinished it and clear it - lateSubtrigger.invokeClear(context); - context.forTrigger(lateSubtrigger).trigger().setFinished(false); - } } /** @@ -296,33 +178,6 @@ public class AfterWatermark { } @Override - public void onElement(OnElementContext c) throws Exception { - // We're interested in knowing when the input watermark passes the end of the window. - // (It is possible this has already happened, in which case the timer will be fired - // almost immediately). - c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // NOTE that the ReduceFnRunner will delete all end-of-window timers for the - // merged-away windows. - - if (!c.trigger().finishedInAllMergingWindows()) { - // If the trigger is still active in any merging window then it is still active in the new - // merged window, because even if the merged window is "done" some pending elements haven't - // had a chance to fire - c.trigger().setFinished(false); - } else if (!endOfWindowReached(c)) { - // If the end of the new window has not been reached, then the trigger is active again. - c.trigger().setFinished(false); - } else { - // Otherwise it is done for good - c.trigger().setFinished(true); - } - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return window.maxTimestamp(); } @@ -346,18 +201,5 @@ public class AfterWatermark { public int hashCode() { return Objects.hash(getClass()); } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return endOfWindowReached(context); - } - - private boolean endOfWindowReached(Trigger.TriggerContext context) { - return context.currentEventTime() != null - && context.currentEventTime().isAfter(context.window().maxTimestamp()); - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java index fee7cdf..a649b4f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing; import java.util.List; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; /** @@ -41,27 +40,6 @@ public class DefaultTrigger extends Trigger{ } @Override - public void onElement(OnElementContext c) throws Exception { - // If the end of the window has already been reached, then we are already ready to fire - // and do not need to set a wake-up timer. - if (!endOfWindowReached(c)) { - c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); - } - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - // If the end of the window has already been reached, then we are already ready to fire - // and do not need to set a wake-up timer. - if (!endOfWindowReached(c)) { - c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME); - } - } - - @Override - public void clear(TriggerContext c) throws Exception { } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return window.maxTimestamp(); } @@ -76,17 +54,4 @@ public class DefaultTrigger extends Trigger{ public Trigger getContinuationTrigger(List continuationTriggers) { return this; } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return endOfWindowReached(context); - } - - private boolean endOfWindowReached(Trigger.TriggerContext context) { - return context.currentEventTime() != null - && context.currentEventTime().isAfter(context.window().maxTimestamp()); - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 07b70f4..664ae83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -48,12 +48,6 @@ public final class Never { } @Override - public void onElement(OnElementContext c) {} - - @Override - public void onMerge(OnMergeContext c) {} - - @Override protected Trigger getContinuationTrigger(List continuationTriggers) { return this; } @@ -62,16 +56,5 @@ public final class Never { public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) { - return false; - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) { - throw new UnsupportedOperationException( - String.format("%s should never fire", getClass().getSimpleName())); - } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 9bef45a..1ed9b55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing; import com.google.common.annotations.VisibleForTesting; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.util.ExecutableTrigger; import org.joda.time.Instant; /** @@ -51,20 +50,6 @@ public class OrFinallyTrigger extends Trigger { } @Override - public void onElement(OnElementContext c) throws Exception { - c.trigger().subTrigger(ACTUAL).invokeOnElement(c); - c.trigger().subTrigger(UNTIL).invokeOnElement(c); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - subTrigger.invokeOnMerge(c); - } - updateFinishedState(c); - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger fires once either the trigger or the until trigger fires. Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window); @@ -83,38 +68,7 @@ public class OrFinallyTrigger extends Trigger { } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context) - || context.trigger().subTrigger(UNTIL).invokeShouldFire(context); - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { - ExecutableTrigger actualSubtrigger = context.trigger().subTrigger(ACTUAL); - ExecutableTrigger untilSubtrigger = context.trigger().subTrigger(UNTIL); - - if (untilSubtrigger.invokeShouldFire(context)) { - untilSubtrigger.invokeOnFire(context); - actualSubtrigger.invokeClear(context); - } else { - // If until didn't fire, then the actual must have (or it is forbidden to call - // onFire) so we are done only if actual is done. - actualSubtrigger.invokeOnFire(context); - // Do not clear the until trigger, because it tracks data cross firings. - } - updateFinishedState(context); - } - - @Override public String toString() { return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL)); } - - private void updateFinishedState(TriggerContext c) throws Exception { - boolean anyStillFinished = false; - for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { - anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished(); - } - c.trigger().setFinished(anyStillFinished); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 45bc6c1..4d79a2c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.util.ExecutableTrigger; import org.joda.time.Instant; /** @@ -61,16 +60,6 @@ public class Repeatedly extends Trigger { } @Override - public void onElement(OnElementContext c) throws Exception { - getRepeated(c).invokeOnElement(c); - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - getRepeated(c).invokeOnMerge(c); - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { // This trigger fires once the repeated trigger fires. return subTriggers.get(REPEATED).getWatermarkThatGuaranteesFiring(window); @@ -82,26 +71,7 @@ public class Repeatedly extends Trigger { } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return getRepeated(context).invokeShouldFire(context); - } - - @Override - public void onFire(TriggerContext context) throws Exception { - getRepeated(context).invokeOnFire(context); - - if (context.trigger().isFinished(REPEATED)) { - // Reset tree will recursively clear the finished bits, and invoke clear. - context.forTrigger(getRepeated(context)).trigger().resetTree(); - } - } - - @Override public String toString() { return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); } - - private ExecutableTrigger getRepeated(TriggerContext context) { - return context.trigger().subTrigger(REPEATED); - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index 18b7a62..1cc807e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -23,22 +23,18 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; -import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.transforms.GroupByKey; import org.joda.time.Instant; /** - * {@code Trigger}s control when the elements for a specific key and window are output. As elements - * arrive, they are put into one or more windows by a {@link Window} transform and its associated - * {@link WindowFn}, and then passed to the associated {@code Trigger} to determine if the - * {@code Window}s contents should be output. + * {@link Trigger Triggers} control when the elements for a specific key and window are output. As + * elements arrive, they are put into one or more windows by a {@link Window} transform and its + * associated {@link WindowFn}, and then passed to the associated {@link Trigger} to determine if + * the {@link BoundedWindow Window's} contents should be output. * - *

See {@link org.apache.beam.sdk.transforms.GroupByKey} and {@link Window} - * for more information about how grouping with windows works. + *

See {@link GroupByKey} and {@link Window} for more information about how grouping with windows + * works. * *

The elements that are assigned to a window since the last time it was fired (or since the * window was created) are placed into the current window pane. Triggers are evaluated against the @@ -46,224 +42,34 @@ import org.joda.time.Instant; * output. When the root trigger finishes (indicating it will never fire again), the window is * closed and any new elements assigned to that window are discarded. * - *

Several predefined {@code Trigger}s are provided: + *

Several predefined {@link Trigger Triggers} are provided: + * *

    - *
  • {@link AfterWatermark} for firing when the watermark passes a timestamp determined from - * either the end of the window or the arrival of the first element in a pane. - *
  • {@link AfterProcessingTime} for firing after some amount of processing time has elapsed - * (typically since the first element in a pane). - *
  • {@link AfterPane} for firing off a property of the elements in the current pane, such as - * the number of elements that have been assigned to the current pane. + *
  • {@link AfterWatermark} for firing when the watermark passes a timestamp determined from + * either the end of the window or the arrival of the first element in a pane. + *
  • {@link AfterProcessingTime} for firing after some amount of processing time has elapsed + * (typically since the first element in a pane). + *
  • {@link AfterPane} for firing off a property of the elements in the current pane, such as the + * number of elements that have been assigned to the current pane. *
* *

In addition, {@code Trigger}s can be combined in a variety of ways: - *

    - *
  • {@link Repeatedly#forever} to create a trigger that executes forever. Any time its - * argument finishes it gets reset and starts over. Can be combined with - * {@link Trigger#orFinally} to specify a condition that causes the repetition to stop. - *
  • {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every) - * time that a trigger fires, and advancing to the next trigger in the sequence when it finishes. - *
  • {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments - * fires. An {@link AfterFirst} trigger finishes after it fires once. - *
  • {@link AfterAll#of} to create a trigger that fires after all least one of its arguments - * have fired at least once. An {@link AfterAll} trigger finishes after it fires once. - *
* - *

Each trigger tree is instantiated per-key and per-window. Every trigger in the tree is in one - * of the following states: *

    - *
  • Never Existed - before the trigger has started executing, there is no state associated - * with it anywhere in the system. A trigger moves to the executing state as soon as it - * processes in the current pane. - *
  • Executing - while the trigger is receiving items and may fire. While it is in this state, - * it may persist book-keeping information to persisted state, set timers, etc. - *
  • Finished - after a trigger finishes, all of its book-keeping data is cleaned up, and the - * system remembers only that it is finished. Entering this state causes us to discard any - * elements in the buffer for that window, as well. + *
  • {@link Repeatedly#forever} to create a trigger that executes forever. Any time its argument + * finishes it gets reset and starts over. Can be combined with {@link Trigger#orFinally} to + * specify a condition that causes the repetition to stop. + *
  • {@link AfterEach#inOrder} to execute each trigger in sequence, firing each (and every) time + * that a trigger fires, and advancing to the next trigger in the sequence when it finishes. + *
  • {@link AfterFirst#of} to create a trigger that fires after at least one of its arguments + * fires. An {@link AfterFirst} trigger finishes after it fires once. + *
  • {@link AfterAll#of} to create a trigger that fires after all least one of its arguments have + * fired at least once. An {@link AfterAll} trigger finishes after it fires once. *
- * - *

Once finished, a trigger cannot return itself back to an earlier state, however a composite - * trigger could reset its sub-triggers. - * - *

Triggers should not build up any state internally since they may be recreated - * between invocations of the callbacks. All important values should be persisted using - * state before the callback returns. */ @Experimental(Experimental.Kind.TRIGGER) public abstract class Trigger implements Serializable { - /** - * Interface for accessing information about the trigger being executed and other triggers in the - * same tree. - */ - public interface TriggerInfo { - - /** - * Returns true if the windowing strategy of the current {@code PCollection} is a merging - * WindowFn. If true, the trigger execution needs to keep enough information to support the - * possibility of {@link Trigger#onMerge} being called. If false, {@link Trigger#onMerge} will - * never be called. - */ - boolean isMerging(); - - /** - * Access the executable versions of the sub-triggers of the current trigger. - */ - Iterable subTriggers(); - - /** - * Access the executable version of the specified sub-trigger. - */ - ExecutableTrigger subTrigger(int subtriggerIndex); - - /** - * Returns true if the current trigger is marked finished. - */ - boolean isFinished(); - - /** - * Return true if the given subtrigger is marked finished. - */ - boolean isFinished(int subtriggerIndex); - - /** - * Returns true if all the sub-triggers of the current trigger are marked finished. - */ - boolean areAllSubtriggersFinished(); - - /** - * Returns an iterable over the unfinished sub-triggers of the current trigger. - */ - Iterable unfinishedSubTriggers(); - - /** - * Returns the first unfinished sub-trigger. - */ - ExecutableTrigger firstUnfinishedSubTrigger(); - - /** - * Clears all keyed state for triggers in the current sub-tree and unsets all the associated - * finished bits. - */ - void resetTree() throws Exception; - - /** - * Sets the finished bit for the current trigger. - */ - void setFinished(boolean finished); - - /** - * Sets the finished bit for the given sub-trigger. - */ - void setFinished(boolean finished, int subTriggerIndex); - } - - /** - * Interact with properties of the trigger being executed, with extensions to deal with the - * merging windows. - */ - public interface MergingTriggerInfo extends TriggerInfo { - - /** Return true if the trigger is finished in any window being merged. */ - boolean finishedInAnyMergingWindow(); - - /** Return true if the trigger is finished in all windows being merged. */ - boolean finishedInAllMergingWindows(); - } - - /** - * Information accessible to all operational hooks in this {@code Trigger}. - * - *

Used directly in {@link Trigger#shouldFire} and {@link Trigger#clear}, and - * extended with additional information in other methods. - */ - public abstract class TriggerContext { - - /** Returns the interface for accessing trigger info. */ - public abstract TriggerInfo trigger(); - - /** Returns the interface for accessing persistent state. */ - public abstract StateAccessor state(); - - /** The window that the current context is executing in. */ - public abstract BoundedWindow window(); - - /** Create a sub-context for the given sub-trigger. */ - public abstract TriggerContext forTrigger(ExecutableTrigger trigger); - - /** - * Removes the timer set in this trigger context for the given {@link Instant} - * and {@link TimeDomain}. - */ - public abstract void deleteTimer(Instant timestamp, TimeDomain domain); - - /** The current processing time. */ - public abstract Instant currentProcessingTime(); - - /** The current synchronized upstream processing time or {@code null} if unknown. */ - @Nullable - public abstract Instant currentSynchronizedProcessingTime(); - - /** The current event time for the input or {@code null} if unknown. */ - @Nullable - public abstract Instant currentEventTime(); - } - - /** - * Extended {@link TriggerContext} containing information accessible to the {@link #onElement} - * operational hook. - */ - public abstract class OnElementContext extends TriggerContext { - /** The event timestamp of the element currently being processed. */ - public abstract Instant eventTimestamp(); - - /** - * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. - * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. - * - *

As with {@link #state}, timers are implicitly scoped to the current window. All - * timer firings for a window will be received, but the implementation should choose to ignore - * those that are not applicable. - * - * @param timestamp the time at which the trigger should be re-evaluated - * @param domain the domain that the {@code timestamp} applies to - */ - public abstract void setTimer(Instant timestamp, TimeDomain domain); - - /** Create an {@code OnElementContext} for executing the given trigger. */ - @Override - public abstract OnElementContext forTrigger(ExecutableTrigger trigger); - } - - /** - * Extended {@link TriggerContext} containing information accessible to the {@link #onMerge} - * operational hook. - */ - public abstract class OnMergeContext extends TriggerContext { - /** - * Sets a timer to fire when the watermark or processing time is beyond the given timestamp. - * Timers are not guaranteed to fire immediately, but will be delivered at some time afterwards. - * - *

As with {@link #state}, timers are implicitly scoped to the current window. All - * timer firings for a window will be received, but the implementation should choose to ignore - * those that are not applicable. - * - * @param timestamp the time at which the trigger should be re-evaluated - * @param domain the domain that the {@code timestamp} applies to - */ - public abstract void setTimer(Instant timestamp, TimeDomain domain); - - /** Create an {@code OnMergeContext} for executing the given trigger. */ - @Override - public abstract OnMergeContext forTrigger(ExecutableTrigger trigger); - - @Override - public abstract MergingStateAccessor state(); - - @Override - public abstract MergingTriggerInfo trigger(); - } - protected final List subTriggers; protected Trigger(List subTriggers) { @@ -274,114 +80,14 @@ public abstract class Trigger implements Serializable { this(Collections.EMPTY_LIST); } - /** - * Called every time an element is incorporated into a window. - */ - public abstract void onElement(OnElementContext c) throws Exception; - - /** - * Called immediately after windows have been merged. - * - *

Leaf triggers should update their state by inspecting their status and any state - * in the merging windows. Composite triggers should update their state by calling - * {@link ExecutableTrigger#invokeOnMerge} on their sub-triggers, and applying appropriate logic. - * - *

A trigger such as {@link AfterWatermark#pastEndOfWindow} may no longer be finished; - * it is the responsibility of the trigger itself to record this fact. It is forbidden for - * a trigger to become finished due to {@link #onMerge}, as it has not yet fired the pending - * elements that led to it being ready to fire. - * - *

The implementation does not need to clear out any state associated with the old windows. - */ - public abstract void onMerge(OnMergeContext c) throws Exception; - - /** - * Returns {@code true} if the current state of the trigger indicates that its condition - * is satisfied and it is ready to fire. - */ - public abstract boolean shouldFire(TriggerContext context) throws Exception; - - /** - * Adjusts the state of the trigger to be ready for the next pane. For example, a - * {@link Repeatedly} trigger will reset its inner trigger, since it has fired. - * - *

If the trigger is finished, it is the responsibility of the trigger itself to - * record that fact via the {@code context}. - */ - public abstract void onFire(TriggerContext context) throws Exception; - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #onElement} call. - */ - public void prefetchOnElement(StateAccessor state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchOnElement(state); - } - } - } - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #onMerge} call. - */ - public void prefetchOnMerge(MergingStateAccessor state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchOnMerge(state); - } - } - } - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #shouldFire} call. - */ - public void prefetchShouldFire(StateAccessor state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchShouldFire(state); - } - } - } - - /** - * Called to allow the trigger to prefetch any state it will likely need to read from during - * an {@link #onFire} call. - */ - public void prefetchOnFire(StateAccessor state) { - if (subTriggers != null) { - for (Trigger trigger : subTriggers) { - trigger.prefetchOnFire(state); - } - } - } - - /** - * Clear any state associated with this trigger in the given window. - * - *

This is called after a trigger has indicated it will never fire again. The trigger system - * keeps enough information to know that the trigger is finished, so this trigger should clear all - * of its state. - */ - public void clear(TriggerContext c) throws Exception { - if (subTriggers != null) { - for (ExecutableTrigger trigger : c.trigger().subTriggers()) { - trigger.invokeClear(c); - } - } - } - public List subTriggers() { return subTriggers; } /** - * Return a trigger to use after a {@code GroupByKey} to preserve the - * intention of this trigger. Specifically, triggers that are time based - * and intended to provide speculative results should continue providing - * speculative results. Triggers that fire once (or multiple times) should + * Return a trigger to use after a {@link GroupByKey} to preserve the intention of this trigger. + * Specifically, triggers that are time based and intended to provide speculative results should + * continue providing speculative results. Triggers that fire once (or multiple times) should * continue firing once (or multiple times). */ public Trigger getContinuationTrigger() { @@ -397,27 +103,24 @@ public abstract class Trigger implements Serializable { } /** - * Return the {@link #getContinuationTrigger} of this {@code Trigger}. For convenience, this - * is provided the continuation trigger of each of the sub-triggers. + * Return the {@link #getContinuationTrigger} of this {@link Trigger}. For convenience, this is + * provided the continuation trigger of each of the sub-triggers. */ protected abstract Trigger getContinuationTrigger(List continuationTriggers); /** - * Returns a bound in watermark time by which this trigger would have fired at least once - * for a given window had there been input data. This is a static property of a trigger - * that does not depend on its state. + * Returns a bound in event time by which this trigger would have fired at least once for a given + * window had there been input data. * - *

For triggers that do not fire based on the watermark advancing, returns - * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + *

For triggers that do not fire based on the watermark advancing, returns {@link + * BoundedWindow#TIMESTAMP_MAX_VALUE}. * - *

This estimate is used to determine that there are no elements in a side-input window, which - * causes the default value to be used instead. + *

This estimate may be used, for example, to determine that there are no elements in a + * side-input window, which causes the default value to be used instead. */ public abstract Instant getWatermarkThatGuaranteesFiring(BoundedWindow window); - /** - * Returns whether this performs the same triggering as the given {@code Trigger}. - */ + /** Returns whether this performs the same triggering as the given {@link Trigger}. */ public boolean isCompatible(Trigger other) { if (!getClass().equals(other.getClass())) { return false; @@ -472,31 +175,33 @@ public abstract class Trigger implements Serializable { } /** - * Specify an ending condition for this trigger. If the {@code until} fires then the combination - * fires. + * Specify an ending condition for this trigger. If the {@code until} {@link Trigger} fires then + * the combination fires. * - *

The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes - * as soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time - * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that - * {@code t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} - * has seen are necessarily in the current pane. + *

The expression {@code t1.orFinally(t2)} fires every time {@code t1} fires, and finishes as + * soon as either {@code t1} finishes or {@code t2} fires, in which case it fires one last time + * for {@code t2}. Both {@code t1} and {@code t2} are executed in parallel. This means that {@code + * t1} may have fired since {@code t2} started, so not all of the elements that {@code t2} has + * seen are necessarily in the current pane. * *

For example the final firing of the following trigger may only have 1 element: - *

 {@code
+   *
+   * 
{@code
    * Repeatedly.forever(AfterPane.elementCountAtLeast(2))
    *     .orFinally(AfterPane.elementCountAtLeast(5))
-   * } 
+ * } + *
* - *

Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same - * as {@code AfterFirst.of(t1, t2)}. + *

Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same as + * {@code AfterFirst.of(t1, t2)}. */ public OrFinallyTrigger orFinally(OnceTrigger until) { return new OrFinallyTrigger(this, until); } /** - * {@link Trigger}s that are guaranteed to fire at most once should extend from this, rather - * than the general {@link Trigger} class to indicate that behavior. + * {@link Trigger Triggers} that are guaranteed to fire at most once should extend {@link + * OnceTrigger} rather than the general {@link Trigger} class to indicate that behavior. */ public abstract static class OnceTrigger extends Trigger { protected OnceTrigger(List subTriggers) { @@ -511,20 +216,5 @@ public abstract class Trigger implements Serializable { } return (OnceTrigger) continuation; } - - /** - * {@inheritDoc} - */ - @Override - public final void onFire(TriggerContext context) throws Exception { - onOnlyFiring(context); - context.trigger().setFinished(true); - } - - /** - * Called exactly once by {@link #onFire} when the trigger is fired. By default, - * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}. - */ - protected abstract void onOnlyFiring(TriggerContext context) throws Exception; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java index 088c499..48a49aa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java @@ -29,9 +29,13 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; /** * A wrapper around a trigger used during execution. While an actual trigger may appear multiple - * times (both in the same trigger expression and in other trigger expressions), the - * {@code ExecutableTrigger} wrapped around them forms a tree (only one occurrence). + * times (both in the same trigger expression and in other trigger expressions), the {@code + * ExecutableTrigger} wrapped around them forms a tree (only one occurrence). + * + * @deprecated uses of {@link ExecutableTrigger} should be ported to + * org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine. */ +@Deprecated public class ExecutableTrigger implements Serializable { /** Store the index assigned to this trigger. */ @@ -115,38 +119,6 @@ public class ExecutableTrigger implements Serializable { } /** - * Invoke the {@link Trigger#onElement} method for this trigger, ensuring that the bits are - * properly updated if the trigger finishes. - */ - public void invokeOnElement(Trigger.OnElementContext c) throws Exception { - trigger.onElement(c.forTrigger(this)); - } - - /** - * Invoke the {@link Trigger#onMerge} method for this trigger, ensuring that the bits are properly - * updated. - */ - public void invokeOnMerge(Trigger.OnMergeContext c) throws Exception { - Trigger.OnMergeContext subContext = c.forTrigger(this); - trigger.onMerge(subContext); - } - - public boolean invokeShouldFire(Trigger.TriggerContext c) throws Exception { - return trigger.shouldFire(c.forTrigger(this)); - } - - public void invokeOnFire(Trigger.TriggerContext c) throws Exception { - trigger.onFire(c.forTrigger(this)); - } - - /** - * Invoke clear for the current this trigger. - */ - public void invokeClear(Trigger.TriggerContext c) throws Exception { - trigger.clear(c.forTrigger(this)); - } - - /** * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH * and never just FIRE. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java deleted file mode 100644 index ea14c40..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggers.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -/** - * A mutable set which tracks whether any particular {@link ExecutableTrigger} is - * finished. - */ -public interface FinishedTriggers { - /** - * Returns {@code true} if the trigger is finished. - */ - boolean isFinished(ExecutableTrigger trigger); - - /** - * Sets the fact that the trigger is finished. - */ - void setFinished(ExecutableTrigger trigger, boolean value); - - /** - * Sets the trigger and all of its subtriggers to unfinished. - */ - void clearRecursively(ExecutableTrigger trigger); - - /** - * Create an independent copy of this mutable {@link FinishedTriggers}. - */ - FinishedTriggers copy(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java deleted file mode 100644 index 4cd617f..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersBitSet.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.util.BitSet; - -/** - * A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. - */ -public class FinishedTriggersBitSet implements FinishedTriggers { - - private final BitSet bitSet; - - private FinishedTriggersBitSet(BitSet bitSet) { - this.bitSet = bitSet; - } - - public static FinishedTriggersBitSet emptyWithCapacity(int capacity) { - return new FinishedTriggersBitSet(new BitSet(capacity)); - } - - public static FinishedTriggersBitSet fromBitSet(BitSet bitSet) { - return new FinishedTriggersBitSet(bitSet); - } - - /** - * Returns the underlying {@link BitSet} for this {@link FinishedTriggersBitSet}. - */ - public BitSet getBitSet() { - return bitSet; - } - - @Override - public boolean isFinished(ExecutableTrigger trigger) { - return bitSet.get(trigger.getTriggerIndex()); - } - - @Override - public void setFinished(ExecutableTrigger trigger, boolean value) { - bitSet.set(trigger.getTriggerIndex(), value); - } - - @Override - public void clearRecursively(ExecutableTrigger trigger) { - bitSet.clear(trigger.getTriggerIndex(), trigger.getFirstIndexAfterSubtree()); - } - - @Override - public FinishedTriggersBitSet copy() { - return new FinishedTriggersBitSet((BitSet) bitSet.clone()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java deleted file mode 100644 index a9feb73..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FinishedTriggersSet.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.collect.Sets; -import java.util.Set; - -/** - * An implementation of {@link FinishedTriggers} atop a user-provided mutable {@link Set}. - */ -public class FinishedTriggersSet implements FinishedTriggers { - - private final Set finishedTriggers; - - private FinishedTriggersSet(Set finishedTriggers) { - this.finishedTriggers = finishedTriggers; - } - - public static FinishedTriggersSet fromSet(Set finishedTriggers) { - return new FinishedTriggersSet(finishedTriggers); - } - - /** - * Returns a mutable {@link Set} of the underlying triggers that are finished. - */ - public Set getFinishedTriggers() { - return finishedTriggers; - } - - @Override - public boolean isFinished(ExecutableTrigger trigger) { - return finishedTriggers.contains(trigger); - } - - @Override - public void setFinished(ExecutableTrigger trigger, boolean value) { - if (value) { - finishedTriggers.add(trigger); - } else { - finishedTriggers.remove(trigger); - } - } - - @Override - public void clearRecursively(ExecutableTrigger trigger) { - finishedTriggers.remove(trigger); - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - clearRecursively(subTrigger); - } - } - - @Override - public FinishedTriggersSet copy() { - return fromSet(Sets.newHashSet(finishedTriggers)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java index 437f14a..8dd648a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java @@ -35,12 +35,6 @@ public class ReshuffleTrigger extends Trigger { } @Override - public void onElement(Trigger.OnElementContext c) { } - - @Override - public void onMerge(Trigger.OnMergeContext c) { } - - @Override protected Trigger getContinuationTrigger(List continuationTriggers) { return this; } @@ -52,14 +46,6 @@ public class ReshuffleTrigger extends Trigger { } @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return true; - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } - - @Override public String toString() { return "ReshuffleTrigger()"; }