Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C6DD8200BE2 for ; Thu, 15 Dec 2016 20:45:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C567B160B15; Thu, 15 Dec 2016 19:45:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BE42E160B13 for ; Thu, 15 Dec 2016 20:45:32 +0100 (CET) Received: (qmail 2028 invoked by uid 500); 15 Dec 2016 19:45:31 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 2019 invoked by uid 99); 15 Dec 2016 19:45:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Dec 2016 19:45:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 6396CC66D0 for ; Thu, 15 Dec 2016 19:45:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id oje5hALSrJHV for ; Thu, 15 Dec 2016 19:45:29 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id A41E05F19B for ; Thu, 15 Dec 2016 19:45:28 +0000 (UTC) Received: (qmail 92404 invoked by uid 99); 15 Dec 2016 19:43:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Dec 2016 19:43:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BB04FE0772; Thu, 15 Dec 2016 19:43:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Thu, 15 Dec 2016 19:43:47 -0000 Message-Id: <9bfe6f7ab4924a4baac843242dcdaa09@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: [BEAM-1154] Get side input from proper window in ReduceFn archived-at: Thu, 15 Dec 2016 19:45:34 -0000 Repository: incubator-beam Updated Branches: refs/heads/release-0.4.0-incubating c09fbd43b -> 3ca8d2bfc [BEAM-1154] Get side input from proper window in ReduceFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d57146f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d57146f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d57146f4 Branch: refs/heads/release-0.4.0-incubating Commit: d57146f47297af2ad0759216a16898b43e0fa0af Parents: c09fbd4 Author: Eugene Kirpichov Authored: Wed Dec 14 14:29:30 2016 -0800 Committer: Kenneth Knowles Committed: Thu Dec 15 11:41:55 2016 -0800 ---------------------------------------------------------------------- .../runners/core/ReduceFnContextFactory.java | 16 +-- .../beam/runners/core/ReduceFnRunnerTest.java | 133 ++++++++++--------- .../beam/sdk/transforms/CombineWithContext.java | 2 +- 3 files changed, 78 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d57146f4/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index c5bda9b..c71897d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ActiveWindowSet; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; @@ -98,11 +97,7 @@ class ReduceFnContextFactory { activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, - stateContextFromComponents( - options, - sideInputReader, - window, - windowingStrategy.getWindowFn()), + stateContextFromComponents(options, sideInputReader, window), style); } @@ -512,8 +507,7 @@ class ReduceFnContextFactory { private static StateContext stateContextFromComponents( @Nullable final PipelineOptions options, final SideInputReader sideInputReader, - final W mainInputWindow, - final WindowFn windowFn) { + final W mainInputWindow) { if (options == null) { return StateContexts.nullContext(); } else { @@ -526,7 +520,11 @@ class ReduceFnContextFactory { @Override public T sideInput(PCollectionView view) { - return sideInputReader.get(view, windowFn.getSideInputWindow(mainInputWindow)); + return sideInputReader.get( + view, + view.getWindowingStrategyInternal() + .getWindowFn() + .getSideInputWindow(mainInputWindow)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d57146f4/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index ba57567..4abfc9a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -17,13 +17,14 @@ */ package org.apache.beam.runners.core; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -36,7 +37,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; import com.google.common.collect.Iterables; -import java.util.Iterator; import java.util.List; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.coders.VarIntCoder; @@ -348,49 +348,67 @@ public class ReduceFnRunnerTest { @Test public void testOnElementCombiningWithContext() throws Exception { - Integer expectedValue = 5; - WindowingStrategy windowingStrategy = WindowingStrategy - .of(FixedWindows.of(Duration.millis(10))) - .withMode(AccumulationMode.DISCARDING_FIRED_PANES) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) - .withAllowedLateness(Duration.millis(100)); + // Create values at timestamps 0 .. 8, windowed into fixed windows of 2. + // Side input windowed into fixed windows of 4: + // main: [ 0 1 ] [ 2 3 ] [ 4 5 ] [ 6 7 ] + // side: [ 100 ] [ 104 ] + // Combine using a CombineFn "side input + sum(main inputs)". + final int firstWindowSideInput = 100; + final int secondWindowSideInput = 104; + final Integer expectedValue = firstWindowSideInput; + WindowingStrategy mainInputWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(2))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); + + WindowingStrategy sideInputWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(4))); TestOptions options = PipelineOptionsFactory.as(TestOptions.class); - options.setValue(5); + options.setValue(expectedValue); when(mockSideInputReader.contains(Matchers.>any())).thenReturn(true); when(mockSideInputReader.get( - Matchers.>any(), any(BoundedWindow.class))).thenReturn(5); + Matchers.>any(), any(BoundedWindow.class))) + .then( + new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + IntervalWindow sideInputWindow = (IntervalWindow) invocation.getArguments()[1]; + long startMs = sideInputWindow.start().getMillis(); + long endMs = sideInputWindow.end().getMillis(); + // Window should have been produced by sideInputWindowingStrategy. + assertThat(startMs, anyOf(equalTo(0L), equalTo(4L))); + assertThat(endMs - startMs, equalTo(4L)); + // If startMs == 4 (second window), equal to secondWindowSideInput. + return firstWindowSideInput + (int) startMs; + } + }); @SuppressWarnings({"rawtypes", "unchecked", "unused"}) Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal()) - .thenReturn((WindowingStrategy) windowingStrategy); + .thenReturn((WindowingStrategy) sideInputWindowingStrategy); SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue); - // Test basic execution of a trigger using a non-combining window set and discarding mode. ReduceFnTester tester = ReduceFnTester.combining( - windowingStrategy, mockTriggerStateMachine, combineFn.asKeyedFn(), + mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.asKeyedFn(), VarIntCoder.of(), options, mockSideInputReader); - injectElement(tester, 2); - - when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); - injectElement(tester, 3); - when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTriggerStateMachine); - injectElement(tester, 4); - - // This element shouldn't be seen, because the trigger has finished - injectElement(tester, 6); + for (int i = 0; i < 8; ++i) { + injectElement(tester, i); + } assertThat( tester.extractOutput(), contains( - isSingleWindowedValue(equalTo(5), 2, 0, 10), - isSingleWindowedValue(equalTo(4), 4, 0, 10))); - assertTrue(tester.isMarkedFinished(firstWindow)); - tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); + isSingleWindowedValue(equalTo(0 + firstWindowSideInput), 1, 0, 2), + isSingleWindowedValue(equalTo(0 + 1 + firstWindowSideInput), 1, 0, 2), + isSingleWindowedValue(equalTo(2 + firstWindowSideInput), 3, 2, 4), + isSingleWindowedValue(equalTo(2 + 3 + firstWindowSideInput), 3, 2, 4), + isSingleWindowedValue(equalTo(4 + secondWindowSideInput), 5, 4, 6), + isSingleWindowedValue(equalTo(4 + 5 + secondWindowSideInput), 5, 4, 6), + isSingleWindowedValue(equalTo(6 + secondWindowSideInput), 7, 6, 8), + isSingleWindowedValue(equalTo(6 + 7 + secondWindowSideInput), 7, 6, 8))); } @Test @@ -1424,7 +1442,8 @@ public class ReduceFnRunnerTest { assertEquals(2, output.size()); } - private static class SumAndVerifyContextFn extends CombineFnWithContext { + private static class SumAndVerifyContextFn + extends CombineFnWithContext { private final PCollectionView view; private final int expectedValue; @@ -1433,50 +1452,38 @@ public class ReduceFnRunnerTest { this.view = view; this.expectedValue = expectedValue; } - @Override - public int[] createAccumulator(Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - return wrap(0); + + private void verifyContext(Context c) { + assertThat(expectedValue, equalTo(c.getPipelineOptions().as(TestOptions.class).getValue())); + assertThat(c.sideInput(view), greaterThanOrEqualTo(100)); } @Override - public int[] addInput(int[] accumulator, Integer input, Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - accumulator[0] += input.intValue(); - return accumulator; + public Integer createAccumulator(Context c) { + verifyContext(c); + return 0; } @Override - public int[] mergeAccumulators(Iterable accumulators, Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - Iterator iter = accumulators.iterator(); - if (!iter.hasNext()) { - return createAccumulator(c); - } else { - int[] running = iter.next(); - while (iter.hasNext()) { - running[0] += iter.next()[0]; - } - return running; - } + public Integer addInput(Integer accumulator, Integer input, Context c) { + verifyContext(c); + return accumulator + input; } @Override - public Integer extractOutput(int[] accumulator, Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - return accumulator[0]; + public Integer mergeAccumulators(Iterable accumulators, Context c) { + verifyContext(c); + int res = 0; + for (Integer accum : accumulators) { + res += accum; + } + return res; } - private int[] wrap(int value) { - return new int[] { value }; + @Override + public Integer extractOutput(Integer accumulator, Context c) { + verifyContext(c); + return accumulator + c.sideInput(view); } } @@ -1484,7 +1491,7 @@ public class ReduceFnRunnerTest { * A {@link PipelineOptions} to test combining with context. */ public interface TestOptions extends PipelineOptions { - Integer getValue(); - void setValue(Integer value); + int getValue(); + void setValue(int value); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d57146f4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java index 7ac952c..cd0600a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java @@ -48,7 +48,7 @@ public class CombineWithContext { /** * Returns the value of the side input for the window corresponding to the - * window of the main input element. + * main input's window in which values are being combined. */ public abstract T sideInput(PCollectionView view); }