beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/2] incubator-beam git commit: Remove WindowedValue.valueInEmptyWindows
Date Tue, 29 Nov 2016 19:10:12 GMT
Remove WindowedValue.valueInEmptyWindows

A value in empty windows expands to no values, so it can be dropped at
any time, perhaps unintentionally. This has bitten runner authors, including
Spark & Dataflow.

While creating such a thing in memory is not automatically problematic, it
is also not really useful. So this change removes it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e49b150
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e49b150
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e49b150

Branch: refs/heads/master
Commit: 0e49b150e83d85ae432c640da937a9497068e71b
Parents: 2f86a6a
Author: Kenneth Knowles <klk@google.com>
Authored: Tue May 10 11:39:35 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Nov 29 11:09:26 2016 -0800

----------------------------------------------------------------------
 .../direct/FlattenEvaluatorFactoryTest.java     |  8 ++---
 .../beam/runners/dataflow/DataflowRunner.java   | 10 +++---
 .../org/apache/beam/sdk/util/WindowedValue.java | 33 +++-----------------
 .../beam/sdk/testing/PaneExtractorsTest.java    |  2 +-
 .../apache/beam/sdk/util/WindowedValueTest.java | 10 ------
 5 files changed, 14 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e49b150/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index cb27fbc..39c7cab 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -78,9 +78,9 @@ public class FlattenEvaluatorFactoryTest {
     rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
     leftSideEvaluator.processElement(
         WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)));
-    leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING));
+    leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING));
     rightSideEvaluator.processElement(
-        WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
+        WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING));
     rightSideEvaluator.processElement(
         WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)));
 
@@ -104,12 +104,12 @@ public class FlattenEvaluatorFactoryTest {
         flattenedLeftBundle.commit(Instant.now()).getElements(),
         containsInAnyOrder(
             WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)),
-            WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING),
+            WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING),
             WindowedValue.valueInGlobalWindow(1)));
     assertThat(
         flattenedRightBundle.commit(Instant.now()).getElements(),
         containsInAnyOrder(
-            WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING),
+            WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING),
             WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)),
             WindowedValue.valueInGlobalWindow(-1)));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e49b150/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ca3f0ed..03c503d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName;
 import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
-import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -1233,7 +1233,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
             // are at a window boundary.
             c.output(IsmRecord.of(
                 ImmutableList.of(previousWindow.get()),
-                valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
+                valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
             map = new HashMap<>();
           }
 
@@ -1254,7 +1254,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
         // window boundary.
         c.output(IsmRecord.of(
             ImmutableList.of(previousWindow.get()),
-            valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
+            valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(),
map))));
       }
     }
 
@@ -1721,7 +1721,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
                                                              Iterable<WindowedValue<V>>,
                                                              Iterable<V>>>>of(
                 ImmutableList.of(previousWindow.get()),
-                valueInEmptyWindows(
+                valueInGlobalWindow(
                     new TransformedMap<>(
                         IterableWithWindowedValuesToIterable.<V>of(), resultMap))));
             multimap = HashMultimap.create();
@@ -1742,7 +1742,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
                                                          Iterable<WindowedValue<V>>,
                                                          Iterable<V>>>>of(
             ImmutableList.of(previousWindow.get()),
-            valueInEmptyWindows(
+            valueInGlobalWindow(
                 new TransformedMap<>(IterableWithWindowedValuesToIterable.<V>of(),
resultMap))));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e49b150/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index a0b4cf5..3251f09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -55,8 +55,7 @@ import org.joda.time.Instant;
 public abstract class WindowedValue<T> {
 
   /**
-   * Returns a {@code WindowedValue} with the given value, timestamp,
-   * and windows.
+   * Returns a {@code WindowedValue} with the given value, timestamp, and windows.
    */
   public static <T> WindowedValue<T> of(
       T value,
@@ -64,10 +63,10 @@ public abstract class WindowedValue<T> {
       Collection<? extends BoundedWindow> windows,
       PaneInfo pane) {
     checkNotNull(pane);
+    checkArgument(
+        windows.size() > 0, "Cannot create %s in no windows", WindowedValue.class.getName());
 
-    if (windows.size() == 0 && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp))
{
-      return valueInEmptyWindows(value, pane);
-    } else if (windows.size() == 1) {
+    if (windows.size() == 1) {
       return of(value, timestamp, windows.iterator().next(), pane);
     } else {
       return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane);
@@ -123,30 +122,6 @@ public abstract class WindowedValue<T> {
   }
 
   /**
-   * Returns a {@code WindowedValue} with the given value in no windows, and the default
timestamp
-   * and pane.
-   *
-   * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed
to drop
-   *     it at any point, and benign runner implementation details could cause silent data
loss.
-   */
-  @Deprecated
-  public static <T> WindowedValue<T> valueInEmptyWindows(T value) {
-    return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING);
-  }
-
-  /**
-   * Returns a {@code WindowedValue} with the given value in no windows, and the default
timestamp
-   * and the specified pane.
-   *
-   * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed
to drop
-   *     it at any point, and benign runner implementation details could cause silent data
loss.
-   */
-  @Deprecated
-  public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo pane)
{
-    return new ValueInEmptyWindows<T>(value, pane);
-  }
-
-  /**
    * Returns a new {@code WindowedValue} that is a copy of this one, but with a different
value,
    * which may have a new type {@code NewT}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e49b150/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
index ef501d4..79106ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
@@ -47,7 +47,7 @@ public class PaneExtractorsTest {
         PaneExtractors.onlyPane();
     Iterable<WindowedValue<Integer>> noFiring =
         ImmutableList.of(
-            WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInEmptyWindows(19));
+            WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInGlobalWindow(19));
     assertThat(extractor.apply(noFiring), containsInAnyOrder(9, 19));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e49b150/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 0c69a59..f7656cc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.util;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
@@ -64,15 +63,6 @@ public class WindowedValueTest {
   }
 
   @Test
-  public void testExplodeWindowsInNoWindowsEmptyIterable() {
-    WindowedValue<String> value =
-        WindowedValue.of(
-            "foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING);
-
-    assertThat(value.explodeWindows(), emptyIterable());
-  }
-
-  @Test
   public void testExplodeWindowsInOneWindowEquals() {
     Instant now = Instant.now();
     BoundedWindow window = new IntervalWindow(now.minus(1000L), now.plus(1000L));


Mime
View raw message