beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: Add PushbackSideInputDoFnRunner
Date Wed, 04 May 2016 17:52:35 GMT
Add PushbackSideInputDoFnRunner

This DoFnRunner wraps a DoFnRunner and provides an additional method to
process an element in all the windows where all side inputs are ready,
returning any elements that it could not process.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f57c1dcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f57c1dcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f57c1dcf

Branch: refs/heads/master
Commit: f57c1dcfe24e80456d4d8c0422eeb2ee9617ca16
Parents: 3e8df24
Author: Thomas Groh <tgroh@google.com>
Authored: Mon May 2 10:04:20 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed May 4 10:51:52 2016 -0700

----------------------------------------------------------------------
 .../sdk/util/PushbackSideInputDoFnRunner.java   | 115 +++++++++
 .../sdk/util/IdentitySideInputWindowFn.java     |  54 +++++
 .../util/PushbackSideInputDoFnRunnerTest.java   | 234 +++++++++++++++++++
 3 files changed, 403 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..4eeedf6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
OutputT> {
+  private final DoFnRunner<InputT, OutputT> underlying;
+  private final Collection<PCollectionView<?>> views;
+  private final ReadyCheckingSideInputReader sideInputReader;
+
+  private Set<BoundedWindow> notReadyWindows;
+
+  public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT>
create(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+  }
+
+  private PushbackSideInputDoFnRunner(
+      DoFnRunner<InputT, OutputT> underlying,
+      Collection<PCollectionView<?>> views,
+      ReadyCheckingSideInputReader sideInputReader) {
+    this.underlying = underlying;
+    this.views = views;
+    this.sideInputReader = sideInputReader;
+  }
+
+  @Override
+  public void startBundle() {
+    notReadyWindows = new HashSet<>();
+    underlying.startBundle();
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided
element
+   * for each window the element is in that is ready.
+   *
+   * @param elem the element to process in all ready windows
+   * @return each element that could not be processed because it requires a side input window
+   * that is not ready.
+   */
+  public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT>
elem) {
+    if (views.isEmpty()) {
+      processElement(elem);
+      return Collections.emptyList();
+    }
+    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+    for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+      BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+      boolean isReady = !notReadyWindows.contains(mainInputWindow);
+      for (PCollectionView<?> view : views) {
+        BoundedWindow sideInputWindow =
+            view.getWindowingStrategyInternal()
+                .getWindowFn()
+                .getSideInputWindow(mainInputWindow);
+        if (!sideInputReader.isReady(view, sideInputWindow)) {
+          isReady = false;
+          break;
+        }
+      }
+      if (isReady) {
+        processElement(windowElem);
+      } else {
+        notReadyWindows.add(mainInputWindow);
+        pushedBack.add(windowElem);
+      }
+    }
+    return pushedBack.build();
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    underlying.processElement(elem);
+  }
+
+  /**
+   * Call the underlying {@link DoFnRunner#finishBundle()}.
+   */
+  @Override
+  public void finishBundle() {
+    notReadyWindows = null;
+    underlying.finishBundle();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
new file mode 100644
index 0000000..ecab6f8
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import java.util.Collection;
+
+/**
+ * A {@link WindowFn} for use during tests that returns the input window for calls to
+ * {@link #getSideInputWindow(BoundedWindow)}.
+ */
+class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow>
{
+  @Override
+  public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext
c)
+      throws Exception {
+    return (Collection<BoundedWindow>) c.windows();
+  }
+
+  @Override
+  public boolean isCompatible(WindowFn<?, ?> other) {
+    return true;
+  }
+
+  @Override
+  public Coder<BoundedWindow> windowCoder() {
+    // not used
+    return (Coder) GlobalWindow.Coder.INSTANCE;
+  }
+
+  @Override
+  public BoundedWindow getSideInputWindow(BoundedWindow window) {
+    return window;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f57c1dcf/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
new file mode 100644
index 0000000..8885118
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link PushbackSideInputDoFnRunner}.
+ */
+@RunWith(JUnit4.class)
+public class PushbackSideInputDoFnRunnerTest {
+  @Mock private ReadyCheckingSideInputReader reader;
+  private TestDoFnRunner<Integer, Integer> underlying;
+  private PCollectionView<Integer> singletonView;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    singletonView =
+        created
+            .apply(Window.into(new IdentitySideInputWindowFn()))
+            .apply(Sum.integersGlobally().asSingletonView());
+
+    underlying = new TestDoFnRunner<>();
+  }
+
+  private PushbackSideInputDoFnRunner<Integer, Integer> createRunner(
+      ImmutableList<PCollectionView<?>> views) {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        PushbackSideInputDoFnRunner.create(underlying, views, reader);
+    runner.startBundle();
+    return runner;
+  }
+
+  @Test
+  public void startFinishBundleDelegates() {
+    PushbackSideInputDoFnRunner runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    assertThat(underlying.started, is(true));
+    assertThat(underlying.finished, is(false));
+    runner.finishBundle();
+    assertThat(underlying.finished, is(true));
+  }
+
+  @Test
+  public void processElementSideInputNotReady() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> oneWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            new IntervalWindow(new Instant(-500L), new Instant(0L)),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> oneWindowPushback =
+        runner.processElementInReadyWindows(oneWindow);
+    assertThat(oneWindowPushback, containsInAnyOrder(oneWindow));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadyMultipleWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(false);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+    assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
+  }
+
+  @Test
+  public void processElementSideInputNotReadySomeWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE)))
+        .thenReturn(false);
+    when(
+            reader.isReady(
+                Mockito.eq(singletonView),
+                org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE))))
+        .thenReturn(true);
+
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of(singletonView));
+
+    IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L));
+    IntervalWindow bigWindow =
+        new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L));
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(
+        multiWindowPushback,
+        containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L))));
+    assertThat(underlying.inputElems,
+        containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING),
+            WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING)));
+  }
+
+  @Test
+  public void processElementSideInputReadyAllWindows() {
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(true);
+
+    ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView);
+    PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views);
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    assertThat(underlying.inputElems,
+        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+  }
+
+  @Test
+  public void processElementNoSideInputs() {
+    PushbackSideInputDoFnRunner<Integer, Integer> runner =
+        createRunner(ImmutableList.<PCollectionView<?>>of());
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            2,
+            new Instant(-2),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(-500L), new Instant(0L)),
+                new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)),
+                GlobalWindow.INSTANCE),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    Iterable<WindowedValue<Integer>> multiWindowPushback =
+        runner.processElementInReadyWindows(multiWindow);
+    assertThat(multiWindowPushback, emptyIterable());
+    assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
+  }
+
+  private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
OutputT> {
+    List<WindowedValue<InputT>> inputElems;
+    private boolean started = false;
+    private boolean finished = false;
+
+    @Override
+    public void startBundle() {
+      started = true;
+      inputElems = new ArrayList<>();
+    }
+
+    @Override
+    public void processElement(WindowedValue<InputT> elem) {
+      inputElems.add(elem);
+    }
+
+    @Override
+    public void finishBundle() {
+      finished = true;
+    }
+  }
+}


Mime
View raw message