beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/2] incubator-beam git commit: Add WindowedValue#explodeWindows
Date Tue, 19 Apr 2016 20:04:37 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 135cb733f -> 0952f4433


Add WindowedValue#explodeWindows

This takes an existing WindowedValue and returns a Collection of
WindowedValues, each of which is in exactly one window.

Use the explode implementation on DoFnRunnerBase


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/98c9d99d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/98c9d99d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/98c9d99d

Branch: refs/heads/master
Commit: 98c9d99d27224012637e96839aee0721200dc351
Parents: 135cb73
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Apr 18 16:55:57 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Tue Apr 19 12:45:17 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  5 +-
 .../org/apache/beam/sdk/util/WindowedValue.java | 13 +++++
 .../apache/beam/sdk/util/WindowedValueTest.java | 53 ++++++++++++++++++++
 3 files changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index e9202a2..75861fe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -141,9 +141,8 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements
DoFnRunner<Inpu
     } else {
       // We could modify the windowed value (and the processContext) to
       // avoid repeated allocations, but this is more straightforward.
-      for (BoundedWindow window : elem.getWindows()) {
-        invokeProcessElement(WindowedValue.of(
-            elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+      for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) {
+        invokeProcessElement(windowedValue);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index f6e82cf..1bbdbd9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -175,6 +176,18 @@ public abstract class WindowedValue<T> {
   public abstract Collection<? extends BoundedWindow> getWindows();
 
   /**
+   * Returns a collection of {@link WindowedValue WindowedValues} identical to this one,
except each
+   * is in exactly one of the windows that this {@link WindowedValue} is in.
+   */
+  public Iterable<WindowedValue<T>> explodeWindows() {
+    ImmutableList.Builder<WindowedValue<T>> windowedValues = ImmutableList.builder();
+    for (BoundedWindow w : getWindows()) {
+      windowedValues.add(of(getValue(), getTimestamp(), w, getPane()));
+    }
+    return windowedValues.build();
+  }
+
+  /**
    * Returns the pane of this {@code WindowedValue} in its window.
    */
   public PaneInfo getPane() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98c9d99d/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index c2c22c0..90969b7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -17,11 +17,21 @@
  */
 package org.apache.beam.sdk.util;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 import org.joda.time.Instant;
 import org.junit.Assert;
@@ -55,4 +65,47 @@ public class WindowedValueTest {
     Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp());
     Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray());
   }
+
+  @Test
+  public void testExplodeWindowsInNoWindowsEmptyIterable() {
+    WindowedValue<String> value =
+        WindowedValue.of(
+            "foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING);
+
+    assertThat(value.explodeWindows(), emptyIterable());
+  }
+
+  @Test
+  public void testExplodeWindowsInOneWindowEquals() {
+    Instant now = Instant.now();
+    BoundedWindow window = new IntervalWindow(now.minus(1000L), now.plus(1000L));
+    WindowedValue<String> value =
+        WindowedValue.of("foo", now, window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+    assertThat(Iterables.getOnlyElement(value.explodeWindows()), equalTo(value));
+  }
+
+  @Test
+  public void testExplodeWindowsManyWindowsMultipleWindowedValues() {
+    Instant now = Instant.now();
+    BoundedWindow centerWindow = new IntervalWindow(now.minus(1000L), now.plus(1000L));
+    BoundedWindow pastWindow = new IntervalWindow(now.minus(1500L), now.plus(500L));
+    BoundedWindow futureWindow = new IntervalWindow(now.minus(500L), now.plus(1500L));
+    BoundedWindow futureFutureWindow = new IntervalWindow(now, now.plus(2000L));
+    PaneInfo pane = PaneInfo.createPane(false, false, Timing.ON_TIME, 3L, 0L);
+    WindowedValue<String> value =
+        WindowedValue.of(
+            "foo",
+            now,
+            ImmutableList.of(pastWindow, centerWindow, futureWindow, futureFutureWindow),
+            pane);
+
+    assertThat(
+        value.explodeWindows(),
+        containsInAnyOrder(
+            WindowedValue.of("foo", now, futureFutureWindow, pane),
+            WindowedValue.of("foo", now, futureWindow, pane),
+            WindowedValue.of("foo", now, centerWindow, pane),
+            WindowedValue.of("foo", now, pastWindow, pane)));
+  }
 }


Mime
View raw message