beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Revert "Remove WindowedValue.valueInEmptyWindows"
Date Wed, 30 Nov 2016 01:41:01 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master b1f7013d8 -> 8042d52fc


Revert "Remove WindowedValue.valueInEmptyWindows"

This reverts commit 0e49b150e83d85ae432c640da937a9497068e71b, which breaks
some DataflowRunner integration tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/98ab5594
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/98ab5594
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/98ab5594

Branch: refs/heads/master
Commit: 98ab559410bde425c9c1944bcd2f09293c3764dc
Parents: b1f7013
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Nov 29 16:57:09 2016 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Nov 29 17:40:50 2016 -0800

----------------------------------------------------------------------
 .../direct/FlattenEvaluatorFactoryTest.java     |  8 ++---
 .../beam/runners/dataflow/DataflowRunner.java   | 10 +++---
 .../org/apache/beam/sdk/util/WindowedValue.java | 33 +++++++++++++++++---
 .../beam/sdk/testing/PaneExtractorsTest.java    |  2 +-
 .../apache/beam/sdk/util/WindowedValueTest.java | 10 ++++++
 5 files changed, 49 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index 39c7cab..cb27fbc 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -78,9 +78,9 @@ public class FlattenEvaluatorFactoryTest {
     rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
     leftSideEvaluator.processElement(
         WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)));
-    leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING));
+    leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING));
     rightSideEvaluator.processElement(
-        WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
+        WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
     rightSideEvaluator.processElement(
         WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)));
 
@@ -104,12 +104,12 @@ public class FlattenEvaluatorFactoryTest {
         flattenedLeftBundle.commit(Instant.now()).getElements(),
         containsInAnyOrder(
             WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)),
-            WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING),
+            WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING),
             WindowedValue.valueInGlobalWindow(1)));
     assertThat(
         flattenedRightBundle.commit(Instant.now()).getElements(),
         containsInAnyOrder(
-            WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING),
             WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)),
             WindowedValue.valueInGlobalWindow(-1)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 641daf4..0099856 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName;
 import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
-import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -1230,7 +1230,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
             // are at a window boundary.
             c.output(IsmRecord.of(
                 ImmutableList.of(previousWindow.get()),
-                valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
+                valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
             map = new HashMap<>();
           }
 
@@ -1251,7 +1251,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
         // window boundary.
         c.output(IsmRecord.of(
             ImmutableList.of(previousWindow.get()),
-            valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
+            valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
       }
     }
 
@@ -1718,7 +1718,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
                                                              Iterable<WindowedValue<V>>,
                                                              Iterable<V>>>>of(
                 ImmutableList.of(previousWindow.get()),
-                valueInGlobalWindow(
+                valueInEmptyWindows(
                     new TransformedMap<>(
                         IterableWithWindowedValuesToIterable.<V>of(), resultMap))));
             multimap = HashMultimap.create();
@@ -1739,7 +1739,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
                                                          Iterable<WindowedValue<V>>,
                                                          Iterable<V>>>>of(
             ImmutableList.of(previousWindow.get()),
-            valueInGlobalWindow(
+            valueInEmptyWindows(
                 new TransformedMap<>(IterableWithWindowedValuesToIterable.<V>of(),
resultMap))));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 3251f09..a0b4cf5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -55,7 +55,8 @@ import org.joda.time.Instant;
 public abstract class WindowedValue<T> {
 
   /**
-   * Returns a {@code WindowedValue} with the given value, timestamp, and windows.
+   * Returns a {@code WindowedValue} with the given value, timestamp,
+   * and windows.
    */
   public static <T> WindowedValue<T> of(
       T value,
@@ -63,10 +64,10 @@ public abstract class WindowedValue<T> {
       Collection<? extends BoundedWindow> windows,
       PaneInfo pane) {
     checkNotNull(pane);
-    checkArgument(
-        windows.size() > 0, "Cannot create %s in no windows", WindowedValue.class.getName());
 
-    if (windows.size() == 1) {
+    if (windows.size() == 0 && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp))
{
+      return valueInEmptyWindows(value, pane);
+    } else if (windows.size() == 1) {
       return of(value, timestamp, windows.iterator().next(), pane);
     } else {
       return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
@@ -122,6 +123,30 @@ public abstract class WindowedValue<T> {
   }
 
   /**
+   * Returns a {@code WindowedValue} with the given value in no windows, and the default
timestamp
+   * and pane.
+   *
+   * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed
to drop
+   *     it at any point, and benign runner implementation details could cause silent data
loss.
+   */
+  @Deprecated
+  public static <T> WindowedValue<T> valueInEmptyWindows(T value) {
+    return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING);
+  }
+
+  /**
+   * Returns a {@code WindowedValue} with the given value in no windows, and the default
timestamp
+   * and the specified pane.
+   *
+   * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed
to drop
+   *     it at any point, and benign runner implementation details could cause silent data
loss.
+   */
+  @Deprecated
+  public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo pane)
{
+    return new ValueInEmptyWindows<T>(value, pane);
+  }
+
+  /**
    * Returns a new {@code WindowedValue} that is a copy of this one, but with a different
value,
    * which may have a new type {@code NewT}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
index 79106ea..ef501d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
@@ -47,7 +47,7 @@ public class PaneExtractorsTest {
         PaneExtractors.onlyPane();
     Iterable<WindowedValue<Integer>> noFiring =
         ImmutableList.of(
-            WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInGlobalWindow(19));
+            WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInEmptyWindows(19));
     assertThat(extractor.apply(noFiring), containsInAnyOrder(9, 19));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index f7656cc..0c69a59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
@@ -63,6 +64,15 @@ public class WindowedValueTest {
   }
 
   @Test
+  public void testExplodeWindowsInNoWindowsEmptyIterable() {
+    WindowedValue<String> value =
+        WindowedValue.of(
+            "foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING);
+
+    assertThat(value.explodeWindows(), emptyIterable());
+  }
+
+  @Test
   public void testExplodeWindowsInOneWindowEquals() {
     Instant now = Instant.now();
     BoundedWindow window = new IntervalWindow(now.minus(1000L), now.plus(1000L));


Mime
View raw message