Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A1CB3200C0F for ; Thu, 2 Feb 2017 20:31:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A07D8160B61; Thu, 2 Feb 2017 19:31:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F04DF160B57 for ; Thu, 2 Feb 2017 20:31:30 +0100 (CET) Received: (qmail 89672 invoked by uid 500); 2 Feb 2017 19:31:30 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 89553 invoked by uid 99); 2 Feb 2017 19:31:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Feb 2017 19:31:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CACF4DFF06; Thu, 2 Feb 2017 19:31:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.apache.org Date: Thu, 02 Feb 2017 19:31:29 -0000 Message-Id: <46060e6a17784dc89d4b57217d2d8651@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Invoke onMerge in AfterWatermarkEarly archived-at: Thu, 02 Feb 2017 19:31:31 -0000 Repository: beam Updated Branches: refs/heads/release-0.5.0 0d0d49705 -> e7599c3e5 Invoke onMerge in AfterWatermarkEarly This ensures that any triggering state manipulations appropriately notify the early subtrigger before resetting the finished bit. This ensures that any timers or state is appropriately migrated to the merged window. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69165f85 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69165f85 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69165f85 Branch: refs/heads/release-0.5.0 Commit: 69165f853d0c434ecd46ae445eb98894b2029360 Parents: 0d0d497 Author: Thomas Groh Authored: Thu Feb 2 09:23:45 2017 -0800 Committer: Thomas Groh Committed: Thu Feb 2 11:07:22 2017 -0800 ---------------------------------------------------------------------- .../triggers/AfterWatermarkStateMachine.java | 1 + .../triggers/AfterWatermarkStateMachineTest.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/69165f85/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 524c057..e83c2f8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -129,6 +129,7 @@ public class AfterWatermarkStateMachine { // the new merged window, because even if the merged window is "done" some pending elements // haven't had a chance to fire. if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) { + earlySubtrigger.invokeOnMerge(earlyContext); earlyContext.trigger().setFinished(false); if (lateTrigger != null) { ExecutableTriggerStateMachine lateSubtrigger = c.trigger().subTrigger(LATE_INDEX); http://git-wip-us.apache.org/repos/asf/beam/blob/69165f85/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java index 119c937..e4d10a0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java @@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -293,6 +295,23 @@ public class AfterWatermarkStateMachineTest { assertTrue(tester.shouldFire(mergedWindow)); } + @Test + public void testEarlyAndLateOnMergeSubtriggerMerges() throws Exception { + tester = + TriggerStateMachineTester.forTrigger( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(mockEarly) + .withLateFirings(mockLate), + Sessions.withGapDuration(Duration.millis(10))); + + tester.injectElements(1); + tester.injectElements(5); + + // Merging should re-activate the early trigger in the merged window + tester.mergeWindows(); + verify(mockEarly).onMerge(Mockito.any(OnMergeContext.class)); + } + /** * Tests that the trigger rewinds to be non-finished in the merged window. *