Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6072D200BAC for ; Wed, 26 Oct 2016 18:44:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5EF21160AE1; Wed, 26 Oct 2016 16:44:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 13908160AEE for ; Wed, 26 Oct 2016 18:43:58 +0200 (CEST) Received: (qmail 87513 invoked by uid 500); 26 Oct 2016 16:43:58 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 87504 invoked by uid 99); 26 Oct 2016 16:43:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2016 16:43:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 8B976C1266 for ; Wed, 26 Oct 2016 16:43:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id cubXUJQqEL_s for ; Wed, 26 Oct 2016 16:43:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 1A9D95FC85 for ; Wed, 26 Oct 2016 16:43:41 +0000 (UTC) Received: (qmail 82222 invoked by uid 99); 26 Oct 2016 16:43:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2016 16:43:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55BD7E0230; Wed, 26 Oct 2016 16:43:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Wed, 26 Oct 2016 16:44:06 -0000 Message-Id: <0ca9d0d7327445a990e7e16263a3db84@git.apache.org> In-Reply-To: <35cd756b7a90424cbecde858fc5896b0@git.apache.org> References: <35cd756b7a90424cbecde858fc5896b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/50] [abbrv] incubator-beam git commit: Remove pieces of Trigger now owned by TriggerStateMachine archived-at: Wed, 26 Oct 2016 16:44:01 -0000 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java deleted file mode 100644 index e09aac2..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TriggerContextFactory.java +++ /dev/null @@ -1,507 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import java.util.Collection; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.transforms.windowing.Trigger.MergingTriggerInfo; -import org.apache.beam.sdk.transforms.windowing.Trigger.TriggerInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTag; -import org.joda.time.Instant; - -/** - * Factory for creating instances of the various {@link Trigger} contexts. - * - *

These contexts are highly interdependent and share many fields; it is inadvisable - * to create them via any means other than this factory class. - */ -public class TriggerContextFactory { - - private final WindowFn windowFn; - private StateInternals stateInternals; - private final Coder windowCoder; - - public TriggerContextFactory(WindowFn windowFn, - StateInternals stateInternals, ActiveWindowSet activeWindows) { - // Future triggers may be able to exploit the active window to state address window mapping. - this.windowFn = windowFn; - this.stateInternals = stateInternals; - this.windowCoder = windowFn.windowCoder(); - } - - public Trigger.TriggerContext base(W window, Timers timers, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { - return new TriggerContextImpl(window, timers, rootTrigger, finishedSet); - } - - public Trigger.OnElementContext createOnElementContext( - W window, Timers timers, Instant elementTimestamp, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet) { - return new OnElementContextImpl(window, timers, rootTrigger, finishedSet, elementTimestamp); - } - - public Trigger.OnMergeContext createOnMergeContext(W window, Timers timers, - ExecutableTrigger rootTrigger, FinishedTriggers finishedSet, - Map finishedSets) { - return new OnMergeContextImpl(window, timers, rootTrigger, finishedSet, finishedSets); - } - - public StateAccessor createStateAccessor(W window, ExecutableTrigger trigger) { - return new StateAccessorImpl(window, trigger); - } - - public MergingStateAccessor createMergingStateAccessor( - W mergeResult, Collection mergingWindows, ExecutableTrigger trigger) { - return new MergingStateAccessorImpl(trigger, mergingWindows, mergeResult); - } - - private class TriggerInfoImpl implements Trigger.TriggerInfo { - - protected final ExecutableTrigger trigger; - protected final FinishedTriggers finishedSet; - private final Trigger.TriggerContext context; - - public TriggerInfoImpl(ExecutableTrigger trigger, FinishedTriggers finishedSet, - Trigger.TriggerContext context) { - this.trigger = trigger; - this.finishedSet = finishedSet; - this.context = context; - } - - @Override - public boolean isMerging() { - return !windowFn.isNonMerging(); - } - - @Override - public Iterable subTriggers() { - return trigger.subTriggers(); - } - - @Override - public ExecutableTrigger subTrigger(int subtriggerIndex) { - return trigger.subTriggers().get(subtriggerIndex); - } - - @Override - public boolean isFinished() { - return finishedSet.isFinished(trigger); - } - - @Override - public boolean isFinished(int subtriggerIndex) { - return finishedSet.isFinished(subTrigger(subtriggerIndex)); - } - - @Override - public boolean areAllSubtriggersFinished() { - return Iterables.isEmpty(unfinishedSubTriggers()); - } - - @Override - public Iterable unfinishedSubTriggers() { - return FluentIterable - .from(trigger.subTriggers()) - .filter(new Predicate() { - @Override - public boolean apply(ExecutableTrigger trigger) { - return !finishedSet.isFinished(trigger); - } - }); - } - - @Override - public ExecutableTrigger firstUnfinishedSubTrigger() { - for (ExecutableTrigger subTrigger : trigger.subTriggers()) { - if (!finishedSet.isFinished(subTrigger)) { - return subTrigger; - } - } - return null; - } - - @Override - public void resetTree() throws Exception { - finishedSet.clearRecursively(trigger); - trigger.invokeClear(context); - } - - @Override - public void setFinished(boolean finished) { - finishedSet.setFinished(trigger, finished); - } - - @Override - public void setFinished(boolean finished, int subTriggerIndex) { - finishedSet.setFinished(subTrigger(subTriggerIndex), finished); - } - } - - private class TriggerTimers implements Timers { - - private final Timers timers; - private final W window; - - public TriggerTimers(W window, Timers timers) { - this.timers = timers; - this.window = window; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timers.setTimer(timestamp, timeDomain); - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - if (timeDomain == TimeDomain.EVENT_TIME - && timestamp.equals(window.maxTimestamp())) { - // Don't allow triggers to unset the at-max-timestamp timer. This is necessary for on-time - // state transitions. - return; - } - timers.deleteTimer(timestamp, timeDomain); - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } - - private class MergingTriggerInfoImpl - extends TriggerInfoImpl implements Trigger.MergingTriggerInfo { - - private final Map finishedSets; - - public MergingTriggerInfoImpl( - ExecutableTrigger trigger, - FinishedTriggers finishedSet, - Trigger.TriggerContext context, - Map finishedSets) { - super(trigger, finishedSet, context); - this.finishedSets = finishedSets; - } - - @Override - public boolean finishedInAnyMergingWindow() { - for (FinishedTriggers finishedSet : finishedSets.values()) { - if (finishedSet.isFinished(trigger)) { - return true; - } - } - return false; - } - - @Override - public boolean finishedInAllMergingWindows() { - for (FinishedTriggers finishedSet : finishedSets.values()) { - if (!finishedSet.isFinished(trigger)) { - return false; - } - } - return true; - } - } - - private class StateAccessorImpl implements StateAccessor { - protected final int triggerIndex; - protected final StateNamespace windowNamespace; - - public StateAccessorImpl( - W window, - ExecutableTrigger trigger) { - this.triggerIndex = trigger.getTriggerIndex(); - this.windowNamespace = namespaceFor(window); - } - - protected StateNamespace namespaceFor(W window) { - return StateNamespaces.windowAndTrigger(windowCoder, window, triggerIndex); - } - - @Override - public StateT access(StateTag address) { - return stateInternals.state(windowNamespace, address); - } - } - - private class MergingStateAccessorImpl extends StateAccessorImpl - implements MergingStateAccessor { - private final Collection activeToBeMerged; - - public MergingStateAccessorImpl(ExecutableTrigger trigger, Collection activeToBeMerged, - W mergeResult) { - super(mergeResult, trigger); - this.activeToBeMerged = activeToBeMerged; - } - - @Override - public StateT access( - StateTag address) { - return stateInternals.state(windowNamespace, address); - } - - @Override - public Map accessInEachMergingWindow( - StateTag address) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (W mergingWindow : activeToBeMerged) { - StateT stateForWindow = stateInternals.state(namespaceFor(mergingWindow), address); - builder.put(mergingWindow, stateForWindow); - } - return builder.build(); - } - } - - private class TriggerContextImpl extends Trigger.TriggerContext { - - private final W window; - private final StateAccessorImpl state; - private final Timers timers; - private final TriggerInfoImpl triggerInfo; - - private TriggerContextImpl( - W window, - Timers timers, - ExecutableTrigger trigger, - FinishedTriggers finishedSet) { - trigger.getSpec().super(); - this.window = window; - this.state = new StateAccessorImpl(window, trigger); - this.timers = new TriggerTimers(window, timers); - this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); - } - - @Override - public Trigger.TriggerContext forTrigger(ExecutableTrigger trigger) { - return new TriggerContextImpl(window, timers, trigger, triggerInfo.finishedSet); - } - - @Override - public TriggerInfo trigger() { - return triggerInfo; - } - - @Override - public StateAccessor state() { - return state; - } - - @Override - public W window() { - return window; - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain domain) { - timers.deleteTimer(timestamp, domain); - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - @Nullable - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } - - private class OnElementContextImpl extends Trigger.OnElementContext { - - private final W window; - private final StateAccessorImpl state; - private final Timers timers; - private final TriggerInfoImpl triggerInfo; - private final Instant eventTimestamp; - - private OnElementContextImpl( - W window, - Timers timers, - ExecutableTrigger trigger, - FinishedTriggers finishedSet, - Instant eventTimestamp) { - trigger.getSpec().super(); - this.window = window; - this.state = new StateAccessorImpl(window, trigger); - this.timers = new TriggerTimers(window, timers); - this.triggerInfo = new TriggerInfoImpl(trigger, finishedSet, this); - this.eventTimestamp = eventTimestamp; - } - - - @Override - public Instant eventTimestamp() { - return eventTimestamp; - } - - @Override - public Trigger.OnElementContext forTrigger(ExecutableTrigger trigger) { - return new OnElementContextImpl( - window, timers, trigger, triggerInfo.finishedSet, eventTimestamp); - } - - @Override - public TriggerInfo trigger() { - return triggerInfo; - } - - @Override - public StateAccessor state() { - return state; - } - - @Override - public W window() { - return window; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain domain) { - timers.setTimer(timestamp, domain); - } - - - @Override - public void deleteTimer(Instant timestamp, TimeDomain domain) { - timers.deleteTimer(timestamp, domain); - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - @Nullable - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } - - private class OnMergeContextImpl extends Trigger.OnMergeContext { - private final MergingStateAccessor state; - private final W window; - private final Collection mergingWindows; - private final Timers timers; - private final MergingTriggerInfoImpl triggerInfo; - - private OnMergeContextImpl( - W window, - Timers timers, - ExecutableTrigger trigger, - FinishedTriggers finishedSet, - Map finishedSets) { - trigger.getSpec().super(); - this.mergingWindows = finishedSets.keySet(); - this.window = window; - this.state = new MergingStateAccessorImpl(trigger, mergingWindows, window); - this.timers = new TriggerTimers(window, timers); - this.triggerInfo = new MergingTriggerInfoImpl(trigger, finishedSet, this, finishedSets); - } - - @Override - public Trigger.OnMergeContext forTrigger(ExecutableTrigger trigger) { - return new OnMergeContextImpl( - window, timers, trigger, triggerInfo.finishedSet, triggerInfo.finishedSets); - } - - @Override - public MergingStateAccessor state() { - return state; - } - - @Override - public MergingTriggerInfo trigger() { - return triggerInfo; - } - - @Override - public W window() { - return window; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain domain) { - timers.setTimer(timestamp, domain); - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain domain) { - timers.setTimer(timestamp, domain); - - } - - @Override - public Instant currentProcessingTime() { - return timers.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timers.currentSynchronizedProcessingTime(); - } - - @Override - @Nullable - public Instant currentEventTime() { - return timers.currentEventTime(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java index b591229..5b213c7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java @@ -18,13 +18,8 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,99 +31,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AfterAllTest { - private SimpleTriggerTester tester; - - @Test - public void testT1FiresFirst() throws Exception { - tester = TriggerTester.forTrigger( - AfterAll.of( - AfterPane.elementCountAtLeast(1), - AfterPane.elementCountAtLeast(2)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testT2FiresFirst() throws Exception { - tester = TriggerTester.forTrigger( - AfterAll.of( - AfterPane.elementCountAtLeast(2), - AfterPane.elementCountAtLeast(1)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - /** - * Tests that the AfterAll properly unsets finished bits when a merge causing it to become - * unfinished. - */ - @Test - public void testOnMergeRewinds() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterAll.of( - AfterWatermark.pastEndOfWindow(), - AfterPane.elementCountAtLeast(1)), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - - // Finish the AfterAll in the first window - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - - // Merge them; the AfterAll should not be finished - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertFalse(tester.isMarkedFinished(mergedWindow)); - - // Confirm that we are back on the first trigger by probing that it is not ready to fire - // after an element (with merging) - tester.injectElements(3); - tester.mergeWindows(); - assertFalse(tester.shouldFire(mergedWindow)); - - // Fire the AfterAll in the merged window - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - - // Confirm that we are on the second trigger by probing - tester.injectElements(2); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - tester.injectElements(2); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - } - @Test public void testFireDeadline() throws Exception { BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java index c413c6e..00d25e0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java @@ -18,19 +18,12 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.MockitoAnnotations; /** * Tests for {@link AfterEach}. @@ -38,63 +31,6 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class AfterEachTest { - private SimpleTriggerTester tester; - - @Before - public void initMocks() { - MockitoAnnotations.initMocks(this); - } - - /** - * Tests that the {@link AfterEach} trigger fires and finishes the first trigger then the second. - */ - @Test - public void testAfterEachInSequence() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)) - .orFinally(AfterPane.elementCountAtLeast(3)), - Repeatedly.forever(AfterPane.elementCountAtLeast(5)) - .orFinally(AfterWatermark.pastEndOfWindow())), - FixedWindows.of(Duration.millis(10))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - // AfterCount(2) not ready - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - // AfterCount(2) ready, not finished - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // orFinally(AfterCount(3)) ready and will finish the first - tester.injectElements(1, 2, 3); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // Now running as the second trigger - assertFalse(tester.shouldFire(window)); - // This quantity of elements would fire and finish if it were erroneously still the first - tester.injectElements(1, 2, 3, 4); - assertFalse(tester.shouldFire(window)); - - // Now fire once - tester.injectElements(5); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // This time advance the watermark to finish the whole mess. - tester.advanceInputWatermark(new Instant(10)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - @Test public void testFireDeadline() throws Exception { BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java index 415060b..2887edb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java @@ -18,22 +18,12 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; /** * Tests for {@link AfterFirst}. @@ -41,116 +31,6 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class AfterFirstTest { - @Mock private OnceTrigger mockTrigger1; - @Mock private OnceTrigger mockTrigger2; - private SimpleTriggerTester tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito.any(); - } - - @Before - public void initMocks() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testNeitherShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); - - assertFalse(tester.shouldFire(window)); // should not fire - assertFalse(tester.isMarkedFinished(window)); // not finished - } - - @Test - public void testOnlyT1ShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(false); - - assertTrue(tester.shouldFire(window)); // should fire - - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testOnlyT2ShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(false); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(window)); // should fire - - tester.fireIfShouldFire(window); // now finished - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testBothShouldFireFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterFirst.of(mockTrigger1, mockTrigger2), FixedWindows.of(Duration.millis(10))); - tester.injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(1), new Instant(11)); - - when(mockTrigger1.shouldFire(anyTriggerContext())).thenReturn(true); - when(mockTrigger2.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(window)); // should fire - - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - /** - * Tests that if the first trigger rewinds to be non-finished in the merged window, - * then it becomes the currently active trigger again, with real triggers. - */ - @Test - public void testShouldFireAfterMerge() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterFirst.of(AfterPane.elementCountAtLeast(5), - AfterWatermark.pastEndOfWindow()), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - // Finished the AfterFirst in the first window - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Set up second window where it is not done - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - assertFalse(tester.shouldFire(secondWindow)); - - // Merge them, if the merged window were on the second trigger, it would be ready - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertFalse(tester.shouldFire(mergedWindow)); - - // Now adding 3 more makes the AfterFirst ready to fire - tester.injectElements(1, 2, 3, 4, 5); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - } - @Test public void testFireDeadline() throws Exception { BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java index 38d030e..1bff80a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java @@ -18,12 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,78 +30,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AfterPaneTest { - SimpleTriggerTester tester; - /** - * Tests that the trigger does fire when enough elements are in a window, and that it only - * fires that window (no leakage). - */ - @Test - public void testAfterPaneElementCountFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), - FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1); // [0, 10) - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2); // [0, 10) - tester.injectElements(11); // [10, 20) - - assertTrue(tester.shouldFire(window)); // ready to fire - tester.fireIfShouldFire(window); // and finished - assertTrue(tester.isMarkedFinished(window)); - - // But don't finish the other window - assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20)))); - } - - @Test - public void testClear() throws Exception { - SimpleTriggerTester tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), - FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1, 2, 3); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - tester.clearState(window); - tester.assertCleared(window); - } - - @Test - public void testAfterPaneElementCountSessions() throws Exception { - tester = TriggerTester.forTrigger( - AfterPane.elementCountAtLeast(2), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements( - 1, // in [1, 11) - 2); // in [2, 12) - - assertFalse(tester.shouldFire(new IntervalWindow(new Instant(1), new Instant(11)))); - assertFalse(tester.shouldFire(new IntervalWindow(new Instant(2), new Instant(12)))); - - tester.mergeWindows(); - - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12)); - assertTrue(tester.shouldFire(mergedWindow)); - tester.fireIfShouldFire(mergedWindow); - assertTrue(tester.isMarkedFinished(mergedWindow)); - - // Because we closed the previous window, we don't have it around to merge with. So there - // will be a new FIRE_AND_FINISH result. - tester.injectElements( - 7, // in [7, 17) - 9); // in [9, 19) - - tester.mergeWindows(); - - IntervalWindow newMergedWindow = new IntervalWindow(new Instant(7), new Instant(19)); - assertTrue(tester.shouldFire(newMergedWindow)); - tester.fireIfShouldFire(newMergedWindow); - assertTrue(tester.isMarkedFinished(newMergedWindow)); - } - @Test public void testFireDeadline() throws Exception { assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java index 13a7acf..4984d7c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -18,12 +18,9 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; @@ -36,97 +33,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class AfterProcessingTimeTest { - /** - * Tests the basic property that the trigger does wait for processing time to be - * far enough advanced. - */ - @Test - public void testAfterProcessingTimeFixedWindows() throws Exception { - Duration windowDuration = Duration.millis(10); - SimpleTriggerTester tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - FixedWindows.of(windowDuration)); - - tester.advanceProcessingTime(new Instant(10)); - - // Timer at 15 - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); - tester.advanceProcessingTime(new Instant(12)); - assertFalse(tester.shouldFire(firstWindow)); - - // Load up elements in the next window, timer at 17 for them - tester.injectElements(11, 12, 13); - IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); - assertFalse(tester.shouldFire(secondWindow)); - - // Not quite time to fire - tester.advanceProcessingTime(new Instant(14)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first - tester.injectElements(2, 3); - - // Advance past the first timer and fire, finishing the first window - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.isMarkedFinished(firstWindow)); - - // The next window fires and finishes now - tester.advanceProcessingTime(new Instant(18)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(secondWindow); - assertTrue(tester.isMarkedFinished(secondWindow)); - } - - /** - * Tests that when windows merge, if the trigger is waiting for "N millis after the first - * element" that it is relative to the earlier of the two merged windows. - */ - @Test - public void testClear() throws Exception { - SimpleTriggerTester tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - FixedWindows.of(Duration.millis(10))); - - tester.injectElements(1, 2, 3); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - tester.clearState(window); - tester.assertCleared(window); - } - - @Test - public void testAfterProcessingTimeWithMergingWindow() throws Exception { - SimpleTriggerTester tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.advanceProcessingTime(new Instant(10)); - tester.injectElements(1); // in [1, 11), timer for 15 - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - - tester.advanceProcessingTime(new Instant(12)); - tester.injectElements(3); // in [3, 13), timer for 17 - IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); - assertFalse(tester.shouldFire(secondWindow)); - - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); - - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(mergedWindow)); - } - @Test public void testFireDeadline() throws Exception { assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java index 7e6e938..49d44c5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTimeTest.java @@ -18,12 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -38,76 +33,6 @@ public class AfterSynchronizedProcessingTimeTest { private Trigger underTest = new AfterSynchronizedProcessingTime(); @Test - public void testAfterProcessingTimeWithFixedWindows() throws Exception { - Duration windowDuration = Duration.millis(10); - SimpleTriggerTester tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - FixedWindows.of(windowDuration)); - - tester.advanceProcessingTime(new Instant(10)); - - // Timer at 15 - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); - tester.advanceProcessingTime(new Instant(12)); - assertFalse(tester.shouldFire(firstWindow)); - - // Load up elements in the next window, timer at 17 for them - tester.injectElements(11, 12, 13); - IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new Instant(20)); - assertFalse(tester.shouldFire(secondWindow)); - - // Not quite time to fire - tester.advanceProcessingTime(new Instant(14)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Timer at 19 for these in the first window; it should be ignored since the 15 will fire first - tester.injectElements(2, 3); - - // Advance past the first timer and fire, finishing the first window - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.isMarkedFinished(firstWindow)); - - // The next window fires and finishes now - tester.advanceProcessingTime(new Instant(18)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(secondWindow); - assertTrue(tester.isMarkedFinished(secondWindow)); - } - - @Test - public void testAfterProcessingTimeWithMergingWindow() throws Exception { - Duration windowDuration = Duration.millis(10); - SimpleTriggerTester tester = TriggerTester.forTrigger( - AfterProcessingTime - .pastFirstElementInPane() - .plusDelayOf(Duration.millis(5)), - Sessions.withGapDuration(windowDuration)); - - tester.advanceProcessingTime(new Instant(10)); - tester.injectElements(1); // in [1, 11), timer for 15 - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - - tester.advanceProcessingTime(new Instant(12)); - tester.injectElements(3); // in [3, 13), timer for 17 - IntervalWindow secondWindow = new IntervalWindow(new Instant(3), new Instant(13)); - assertFalse(tester.shouldFire(secondWindow)); - - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(13)); - - tester.advanceProcessingTime(new Instant(16)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - @Test public void testFireDeadline() throws Exception { assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, underTest.getWatermarkThatGuaranteesFiring( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java index 084027b..a418d63 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java @@ -18,23 +18,10 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.when; -import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; /** * Tests the {@link AfterWatermark} triggers. @@ -42,301 +29,6 @@ import org.mockito.MockitoAnnotations; @RunWith(JUnit4.class) public class AfterWatermarkTest { - @Mock private OnceTrigger mockEarly; - @Mock private OnceTrigger mockLate; - - private SimpleTriggerTester tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito.any(); - } - private static Trigger.OnElementContext anyElementContext() { - return Mockito.any(); - } - - private void injectElements(int... elements) throws Exception { - for (int element : elements) { - doNothing().when(mockEarly).onElement(anyElementContext()); - doNothing().when(mockLate).onElement(anyElementContext()); - tester.injectElements(element); - } - } - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - public void testRunningAsTrigger(OnceTrigger mockTrigger, IntervalWindow window) - throws Exception { - - // Don't fire due to mock saying no - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); - assertFalse(tester.shouldFire(window)); // not ready - - // Fire due to mock trigger; early trigger is required to be a OnceTrigger - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(window)); // ready - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - } - - @Test - public void testEarlyAndAtWatermark() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(mockEarly), - FixedWindows.of(Duration.millis(100))); - - injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - testRunningAsTrigger(mockEarly, window); - - // Fire due to watermark - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - @Test - public void testAtWatermarkAndLate() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withLateFirings(mockLate), - FixedWindows.of(Duration.millis(100))); - - injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - // No early firing, just double checking - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(true); - assertFalse(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // Fire due to watermark - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - testRunningAsTrigger(mockLate, window); - } - - @Test - public void testEarlyAndAtWatermarkAndLate() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(mockEarly) - .withLateFirings(mockLate), - FixedWindows.of(Duration.millis(100))); - - injectElements(1); - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - testRunningAsTrigger(mockEarly, window); - - // Fire due to watermark - when(mockEarly.shouldFire(anyTriggerContext())).thenReturn(false); - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - testRunningAsTrigger(mockLate, window); - } - - /** - * Tests that if the EOW is finished in both as well as the merged window, then - * it is finished in the merged result. - * - *

Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testOnMergeAlreadyFinished() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterWatermark.pastEndOfWindow(), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() trigger in both windows - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Confirm that we are on the second trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.injectElements(1); - tester.injectElements(5); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Merging should leave it finished - tester.mergeWindows(); - - // Confirm that we are on the second trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that the trigger rewinds to be non-finished in the merged window. - * - *

Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testOnMergeRewinds() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterWatermark.pastEndOfWindow(), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() trigger in only the first window - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - - // Confirm that we are on the second trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Merging should re-activate the watermark trigger in the merged window - tester.mergeWindows(); - - // Confirm that we are not on the second trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertFalse(tester.shouldFire(mergedWindow)); - - // And confirm that advancing the watermark fires again - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that if the EOW is finished in both as well as the merged window, then - * it is finished in the merged result. - * - *

Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testEarlyAndLateOnMergeAlreadyFinished() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(100)) - .withLateFirings(AfterPane.elementCountAtLeast(1)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in both windows - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Confirm that we are on the late trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.injectElements(1); - tester.injectElements(5); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - tester.fireIfShouldFire(secondWindow); - - // Merging should leave it on the late trigger - tester.mergeWindows(); - - // Confirm that we are on the late trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that the trigger rewinds to be non-finished in the merged window. - * - *

Because windows are discarded when a trigger finishes, we need to embed this - * in a sequence in order to check that it is re-activated. So this test is potentially - * sensitive to other triggers' correctness. - */ - @Test - public void testEarlyAndLateOnMergeRewinds() throws Exception { - tester = TriggerTester.forTrigger( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(100)) - .withLateFirings(AfterPane.elementCountAtLeast(1)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - tester.injectElements(5); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - - // Finish the AfterWatermark.pastEndOfWindow() bit of the trigger in only the first window - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - tester.fireIfShouldFire(firstWindow); - - // Confirm that we are on the late trigger by probing - assertFalse(tester.shouldFire(firstWindow)); - tester.injectElements(1); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Merging should re-activate the early trigger in the merged window - tester.mergeWindows(); - - // Confirm that we are not on the second trigger by probing - assertFalse(tester.shouldFire(mergedWindow)); - tester.injectElements(1); - assertFalse(tester.shouldFire(mergedWindow)); - - // And confirm that advancing the watermark fires again - tester.advanceInputWatermark(new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - } - @Test public void testFromEndOfWindowToString() { Trigger trigger = AfterWatermark.pastEndOfWindow(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java index 673e555..ee1c44a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/DefaultTriggerTest.java @@ -18,12 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,131 +31,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class DefaultTriggerTest { - SimpleTriggerTester tester; - - @Test - public void testDefaultTriggerFixedWindows() throws Exception { - tester = TriggerTester.forTrigger( - DefaultTrigger.of(), - FixedWindows.of(Duration.millis(100))); - - tester.injectElements( - 1, // [0, 100) - 101); // [100, 200) - - IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new Instant(100)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(100), new Instant(200)); - - // Advance the watermark almost to the end of the first window. - tester.advanceInputWatermark(new Instant(99)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Advance watermark past end of the first window, which is then ready - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Fire, but the first window is still allowed to fire - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - - // Advance watermark to 200, then both are ready - tester.advanceInputWatermark(new Instant(200)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - - assertFalse(tester.isMarkedFinished(firstWindow)); - assertFalse(tester.isMarkedFinished(secondWindow)); - } - - @Test - public void testDefaultTriggerSlidingWindows() throws Exception { - tester = TriggerTester.forTrigger( - DefaultTrigger.of(), - SlidingWindows.of(Duration.millis(100)).every(Duration.millis(50))); - - tester.injectElements( - 1, // [-50, 50), [0, 100) - 50); // [0, 100), [50, 150) - - IntervalWindow firstWindow = new IntervalWindow(new Instant(-50), new Instant(50)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(0), new Instant(100)); - IntervalWindow thirdWindow = new IntervalWindow(new Instant(50), new Instant(150)); - - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - - // At 50, the first becomes ready; it stays ready after firing - tester.advanceInputWatermark(new Instant(50)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - tester.fireIfShouldFire(firstWindow); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - - // At 99, the first is still the only one ready - tester.advanceInputWatermark(new Instant(99)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - - // At 100, the first and second are ready - tester.advanceInputWatermark(new Instant(100)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(thirdWindow)); - tester.fireIfShouldFire(firstWindow); - - assertFalse(tester.isMarkedFinished(firstWindow)); - assertFalse(tester.isMarkedFinished(secondWindow)); - assertFalse(tester.isMarkedFinished(thirdWindow)); - } - - @Test - public void testDefaultTriggerSessions() throws Exception { - tester = TriggerTester.forTrigger( - DefaultTrigger.of(), - Sessions.withGapDuration(Duration.millis(100))); - - tester.injectElements( - 1, // [1, 101) - 50); // [50, 150) - tester.mergeWindows(); - - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(101)); - IntervalWindow secondWindow = new IntervalWindow(new Instant(50), new Instant(150)); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(150)); - - // Not ready in any window yet - tester.advanceInputWatermark(new Instant(100)); - assertFalse(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(mergedWindow)); - - // The first window is "ready": the caller owns knowledge of which windows are merged away - tester.advanceInputWatermark(new Instant(149)); - assertTrue(tester.shouldFire(firstWindow)); - assertFalse(tester.shouldFire(secondWindow)); - assertFalse(tester.shouldFire(mergedWindow)); - - // Now ready on all windows - tester.advanceInputWatermark(new Instant(150)); - assertTrue(tester.shouldFire(firstWindow)); - assertTrue(tester.shouldFire(secondWindow)); - assertTrue(tester.shouldFire(mergedWindow)); - - // Ensure it repeats - tester.fireIfShouldFire(mergedWindow); - assertTrue(tester.shouldFire(mergedWindow)); - - assertFalse(tester.isMarkedFinished(mergedWindow)); - } - @Test public void testFireDeadline() throws Exception { assertEquals(new Instant(9), DefaultTrigger.of().getWatermarkThatGuaranteesFiring( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java index fb2b4d5..1052873 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java @@ -17,40 +17,26 @@ */ package org.apache.beam.sdk.transforms.windowing; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link Never}. - */ +/** Tests for {@link Never}. */ @RunWith(JUnit4.class) public class NeverTest { - private SimpleTriggerTester triggerTester; - - @Before - public void setup() throws Exception { - triggerTester = - TriggerTester.forTrigger( - Never.ever(), FixedWindows.of(Duration.standardMinutes(5))); + @Test + public void testFireDeadline() throws Exception { + assertEquals( + BoundedWindow.TIMESTAMP_MAX_VALUE, + Never.ever() + .getWatermarkThatGuaranteesFiring(new IntervalWindow(new Instant(0), new Instant(10)))); } @Test - public void falseAfterEndOfWindow() throws Exception { - triggerTester.injectElements(TimestampedValue.of(1, new Instant(1))); - IntervalWindow window = - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5))); - assertThat(triggerTester.shouldFire(window), is(false)); - triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); - assertThat(triggerTester.shouldFire(window), is(false)); + public void testContinuation() throws Exception { + assertEquals(Never.ever(), Never.ever().getContinuationTrigger()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java index 7289d97..6e61e10 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java @@ -18,13 +18,8 @@ package org.apache.beam.sdk.transforms.windowing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; -import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,137 +31,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class OrFinallyTriggerTest { - private SimpleTriggerTester tester; - - /** - * Tests that for {@code OrFinally(actual, ...)} when {@code actual} - * fires and finishes, the {@code OrFinally} also fires and finishes. - */ - @Test - public void testActualFiresAndFinishes() throws Exception { - tester = TriggerTester.forTrigger( - new OrFinallyTrigger( - AfterPane.elementCountAtLeast(2), - AfterPane.elementCountAtLeast(100)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - // Not yet firing - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - assertFalse(tester.isMarkedFinished(window)); - - // The actual fires and finishes - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - - /** - * Tests that for {@code OrFinally(actual, ...)} when {@code actual} - * fires but does not finish, the {@code OrFinally} also fires and also does not - * finish. - */ - @Test - public void testActualFiresOnly() throws Exception { - tester = TriggerTester.forTrigger( - new OrFinallyTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - AfterPane.elementCountAtLeast(100)), - FixedWindows.of(Duration.millis(100))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100)); - - // Not yet firing - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - assertFalse(tester.isMarkedFinished(window)); - - // The actual fires but does not finish - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // And again - tester.injectElements(3, 4); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - } - - /** - * Tests that if the first trigger rewinds to be non-finished in the merged window, - * then it becomes the currently active trigger again, with real triggers. - */ - @Test - public void testShouldFireAfterMerge() throws Exception { - tester = TriggerTester.forTrigger( - AfterEach.inOrder( - AfterPane.elementCountAtLeast(5) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever(AfterPane.elementCountAtLeast(1))), - Sessions.withGapDuration(Duration.millis(10))); - - // Finished the orFinally in the first window - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - tester.advanceInputWatermark(new Instant(11)); - assertTrue(tester.shouldFire(firstWindow)); - tester.fireIfShouldFire(firstWindow); - - // Set up second window where it is not done - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - assertFalse(tester.shouldFire(secondWindow)); - - // Merge them, if the merged window were on the second trigger, it would be ready - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertFalse(tester.shouldFire(mergedWindow)); - - // Now adding 3 more makes the main trigger ready to fire - tester.injectElements(1, 2, 3, 4, 5); - tester.mergeWindows(); - assertTrue(tester.shouldFire(mergedWindow)); - } - - /** - * Tests that for {@code OrFinally(actual, until)} when {@code actual} - * fires but does not finish, then {@code until} fires and finishes, the - * whole thing fires and finished. - */ - @Test - public void testActualFiresButUntilFinishes() throws Exception { - tester = TriggerTester.forTrigger( - new OrFinallyTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - AfterPane.elementCountAtLeast(3)), - FixedWindows.of(Duration.millis(10))); - - IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); - - // Before any firing - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - assertFalse(tester.isMarkedFinished(window)); - - // The actual fires but doesn't finish - tester.injectElements(2); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.isMarkedFinished(window)); - - // The until fires and finishes; the trigger is finished - tester.injectElements(3); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertTrue(tester.isMarkedFinished(window)); - } - @Test public void testFireDeadline() throws Exception { BoundedWindow window = new IntervalWindow(new Instant(0), new Instant(10)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java index 6e8930d..55cb77e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java @@ -19,14 +19,9 @@ package org.apache.beam.sdk.transforms.windowing; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.apache.beam.sdk.util.TriggerTester; -import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; @@ -36,51 +31,17 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -/** - * Tests for {@link Repeatedly}. - */ +/** Tests for {@link Repeatedly}. */ @RunWith(JUnit4.class) public class RepeatedlyTest { @Mock private Trigger mockTrigger; - private SimpleTriggerTester tester; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito.any(); - } public void setUp(WindowFn windowFn) throws Exception { MockitoAnnotations.initMocks(this); - tester = TriggerTester.forTrigger(Repeatedly.forever(mockTrigger), windowFn); - } - - /** - * Tests that onElement correctly passes the data on to the subtrigger. - */ - @Test - public void testOnElement() throws Exception { - setUp(FixedWindows.of(Duration.millis(10))); - tester.injectElements(37); - verify(mockTrigger).onElement(Mockito.any()); - } - - /** - * Tests that the repeatedly is ready to fire whenever the subtrigger is ready. - */ - @Test - public void testShouldFire() throws Exception { - setUp(FixedWindows.of(Duration.millis(10))); - - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - assertTrue(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); - - when(mockTrigger.shouldFire(Mockito.any())) - .thenReturn(false); - assertFalse(tester.shouldFire(new IntervalWindow(new Instant(0), new Instant(10)))); } - /** - * Tests that the watermark that guarantees firing is that of the subtrigger. - */ + /** Tests that the watermark that guarantees firing is that of the subtrigger. */ @Test public void testFireDeadline() throws Exception { setUp(FixedWindows.of(Duration.millis(10))); @@ -107,118 +68,16 @@ public class RepeatedlyTest { } @Test - public void testShouldFireAfterMerge() throws Exception { - tester = TriggerTester.forTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)), - Sessions.withGapDuration(Duration.millis(10))); - - tester.injectElements(1); - IntervalWindow firstWindow = new IntervalWindow(new Instant(1), new Instant(11)); - assertFalse(tester.shouldFire(firstWindow)); - - tester.injectElements(5); - IntervalWindow secondWindow = new IntervalWindow(new Instant(5), new Instant(15)); - assertFalse(tester.shouldFire(secondWindow)); - - // Merge them, if the merged window were on the second trigger, it would be ready - tester.mergeWindows(); - IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(15)); - assertTrue(tester.shouldFire(mergedWindow)); - } - - @Test - public void testRepeatedlyAfterFirstElementCount() throws Exception { - SimpleTriggerTester tester = - TriggerTester.forTrigger( - Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(15)), - AfterPane.elementCountAtLeast(5))), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2, 3, 4, 5); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - @Test - public void testRepeatedlyAfterFirstProcessingTime() throws Exception { - SimpleTriggerTester tester = - TriggerTester.forTrigger( - Repeatedly.forever( - AfterFirst.of( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(15)), - AfterPane.elementCountAtLeast(5))), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - @Test - public void testRepeatedlyElementCount() throws Exception { - SimpleTriggerTester tester = - TriggerTester.forTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(5)), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.injectElements(2, 3, 4, 5); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - @Test - public void testRepeatedlyProcessingTime() throws Exception { - SimpleTriggerTester tester = - TriggerTester.forTrigger( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(Duration.standardMinutes(15))), - new GlobalWindows()); - - GlobalWindow window = GlobalWindow.INSTANCE; - - tester.injectElements(1); - assertFalse(tester.shouldFire(window)); - - tester.advanceProcessingTime(new Instant(0).plus(Duration.standardMinutes(15))); - assertTrue(tester.shouldFire(window)); - tester.fireIfShouldFire(window); - assertFalse(tester.shouldFire(window)); - } - - - @Test public void testToString() { - Trigger trigger = Repeatedly.forever(new StubTrigger() { - @Override - public String toString() { - return "innerTrigger"; - } - }); + Trigger trigger = + Repeatedly.forever( + new StubTrigger() { + @Override + public String toString() { + return "innerTrigger"; + } + }); assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString()); } - } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java index b258a79..0fc74e7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java @@ -42,23 +42,6 @@ abstract class StubTrigger extends Trigger.OnceTrigger { } @Override - protected void onOnlyFiring(TriggerContext context) throws Exception { - } - - @Override - public void onElement(OnElementContext c) throws Exception { - } - - @Override - public void onMerge(OnMergeContext c) throws Exception { - } - - @Override - public boolean shouldFire(TriggerContext context) throws Exception { - return false; - } - - @Override protected Trigger getContinuationTrigger(List continuationTriggers) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java index cfc03b2..2602f79 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java @@ -59,12 +59,6 @@ public class TriggerTest { } @Override - public void onElement(Trigger.OnElementContext c) { } - - @Override - public void onMerge(Trigger.OnMergeContext c) { } - - @Override protected Trigger getContinuationTrigger( List continuationTriggers) { return null; @@ -74,14 +68,6 @@ public class TriggerTest { public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return null; } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return false; - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } } private static class Trigger2 extends Trigger { @@ -91,12 +77,6 @@ public class TriggerTest { } @Override - public void onElement(Trigger.OnElementContext c) { } - - @Override - public void onMerge(Trigger.OnMergeContext c) { } - - @Override protected Trigger getContinuationTrigger( List continuationTriggers) { return null; @@ -106,13 +86,5 @@ public class TriggerTest { public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return null; } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return false; - } - - @Override - public void onFire(Trigger.TriggerContext context) throws Exception { } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java index 1e3a1ff..befc07e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java @@ -92,16 +92,6 @@ public class ExecutableTriggerTest { } @Override - public void onElement(OnElementContext c) throws Exception { } - - @Override - public void onMerge(OnMergeContext c) throws Exception { } - - @Override - public void clear(TriggerContext c) throws Exception { - } - - @Override public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } @@ -115,13 +105,5 @@ public class ExecutableTriggerTest { public Trigger getContinuationTrigger(List continuationTriggers) { return this; } - - @Override - public boolean shouldFire(TriggerContext c) { - return false; - } - - @Override - public void onFire(TriggerContext c) { } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eff320d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java deleted file mode 100644 index 7f74620..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link FinishedTriggersBitSet}. - */ -@RunWith(JUnit4.class) -public class FinishedTriggersBitSetTest { - /** - * Tests that after a trigger is set to finished, it reads back as finished. - */ - @Test - public void testSetGet() { - FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1)); - } - - /** - * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no - * others. - */ - @Test - public void testClearRecursively() { - FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1)); - } - - @Test - public void testCopy() throws Exception { - FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10); - assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet()))); - } -}