beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [4/5] incubator-beam git commit: Remove references to multi-window representation from model
Date Thu, 23 Jun 2016 17:01:47 GMT
Remove references to multi-window representation from model

Some areas of the Beam model in the SDK allude to the use of a
compressed representation of an element along with the set
of windows it is assigned to. However, the model itself views
elements in different windows as fully independent, so the SDK
should not place any obligation on the part of the runner or
user to use a particular representation.

This change removes those places in the SDK where an element
is treated in multiple windows at once.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08104410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08104410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08104410

Branch: refs/heads/master
Commit: 08104410177063b1095bd91b24b40f9961c92cf2
Parents: a3aa4c7
Author: Kenneth Knowles <klk@google.com>
Authored: Mon May 9 12:17:09 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jun 23 09:35:44 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/AssignWindowsDoFn.java | 15 ++++---
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  2 +-
 .../beam/sdk/util/ReduceFnRunnerTest.java       |  3 +-
 .../apache/beam/sdk/util/ReduceFnTester.java    | 46 +++++++++++---------
 .../runners/direct/WindowEvaluatorFactory.java  |  6 ++-
 .../direct/WindowEvaluatorFactoryTest.java      |  4 +-
 .../FlinkStreamingTransformTranslators.java     |  5 ++-
 .../functions/FlinkAssignContext.java           | 15 ++++++-
 .../functions/FlinkNoElementAssignContext.java  |  4 +-
 .../streaming/FlinkAbstractParDoWrapper.java    |  4 +-
 .../flink/streaming/GroupAlsoByWindowTest.java  |  2 +-
 .../beam/sdk/testing/WindowFnTestUtils.java     |  5 ++-
 .../sdk/transforms/windowing/GlobalWindows.java |  5 ---
 .../windowing/PartitioningWindowFn.java         |  5 ---
 .../beam/sdk/transforms/windowing/WindowFn.java | 11 +----
 .../apache/beam/sdk/util/GatherAllPanes.java    |  3 +-
 .../apache/beam/sdk/util/IdentityWindowFn.java  | 20 +++------
 .../org/apache/beam/sdk/util/Reshuffle.java     |  3 +-
 .../sdk/util/IdentitySideInputWindowFn.java     |  3 +-
 .../sdk/util/MergingActiveWindowSetTest.java    |  6 +--
 .../org/apache/beam/sdk/util/TriggerTester.java | 14 +++---
 21 files changed, 89 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
index caec40e..d40b007 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
@@ -20,22 +20,27 @@ package org.apache.beam.sdk.util;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
 
 /**
- * {@link DoFn} that tags elements of a PCollection with windows, according
- * to the provided {@link WindowFn}.
+ * {@link DoFn} that tags elements of a {@link PCollection} with windows, according to the
provided
+ * {@link WindowFn}.
+ *
  * @param <T> Type of elements being windowed
  * @param <W> Window type
  */
 @SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T>
{
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T>
+    implements RequiresWindowAccess {
   private WindowFn<? super T, W> fn;
 
   public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
@@ -64,8 +69,8 @@ public class AssignWindowsDoFn<T, W extends BoundedWindow> extends
DoFn<T, T> {
                 }
 
                 @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
+                public BoundedWindow window() {
+                  return Iterables.getOnlyElement(c.windowingInternals().windows());
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 1ebe72b..a849eb2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -254,7 +254,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements
DoFnRunner<Inpu
             }
 
             @Override
-            public Collection<? extends BoundedWindow> windows() {
+            public W window() {
               throw new UnsupportedOperationException(
                   "WindowFn attempted to access input windows when none were available");
             }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index b7ec540..64fcae3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -82,7 +82,6 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -258,7 +257,7 @@ public class ReduceFnRunnerTest {
               }
 
               @Override
-              public Collection<? extends BoundedWindow> windows() {
+              public BoundedWindow window() {
                 throw new UnsupportedOperationException();
               }
             }));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index 9916c5c..e897f54 100644
--- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -401,21 +401,25 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow>
{
       WindowTracing.trace("TriggerTester.injectElements: {}", value);
     }
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.processElements(Iterables.transform(
-        Arrays.asList(values), new Function<TimestampedValue<InputT>, WindowedValue<InputT>>()
{
-          @Override
-          public WindowedValue<InputT> apply(TimestampedValue<InputT> input)
{
-            try {
-              InputT value = input.getValue();
-              Instant timestamp = input.getTimestamp();
-              Collection<W> windows = windowFn.assignWindows(new TestAssignContext<W>(
-                  windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE)));
-              return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING);
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          }
-        }));
+    runner.processElements(
+        Iterables.transform(
+            Arrays.asList(values),
+            new Function<TimestampedValue<InputT>, WindowedValue<InputT>>()
{
+              @Override
+              public WindowedValue<InputT> apply(TimestampedValue<InputT> input)
{
+                try {
+                  InputT value = input.getValue();
+                  Instant timestamp = input.getTimestamp();
+                  Collection<W> windows =
+                      windowFn.assignWindows(
+                          new TestAssignContext<W>(
+                              windowFn, value, timestamp, GlobalWindow.INSTANCE));
+                  return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING);
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            }));
 
     // Persist after each bundle.
     runner.persist();
@@ -538,14 +542,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow>
{
       extends WindowFn<Object, W>.AssignContext {
     private Object element;
     private Instant timestamp;
-    private Collection<? extends BoundedWindow> windows;
+    private BoundedWindow window;
 
-    public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant
timestamp,
-        Collection<? extends BoundedWindow> windows) {
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow
window) {
       windowFn.super();
       this.element = element;
       this.timestamp = timestamp;
-      this.windows = windows;
+      this.window = window;
     }
 
     @Override
@@ -559,8 +563,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow>
{
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return windows;
+    public BoundedWindow window() {
+      return window;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 6045912..67c2f17 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
@@ -125,8 +127,8 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return value.getWindows();
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(value.getWindows());
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index c5faa5a..65dcfeb 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -326,11 +326,11 @@ public class WindowEvaluatorFactoryTest {
   private static class EvaluatorTestWindowFn extends NonMergingWindowFn<Long, BoundedWindow>
{
     @Override
     public Collection<BoundedWindow> assignWindows(AssignContext c) throws Exception
{
-      if (c.windows().contains(GlobalWindow.INSTANCE)) {
+      if (c.window().equals(GlobalWindow.INSTANCE)) {
         return Collections.<BoundedWindow>singleton(new IntervalWindow(c.timestamp(),
             c.timestamp().plus(1L)));
       }
-      return (Collection<BoundedWindow>) c.windows();
+      return Collections.singleton(c.window());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index b3fed99..5d04068 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.api.client.util.Maps;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.functions.FilterFunction;
@@ -359,8 +360,8 @@ public class FlinkStreamingTransformTranslators {
                 }
 
                 @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
+                public BoundedWindow window() {
+                  return Iterables.getOnlyElement(c.windowingInternals().windows());
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
index 7ea8c20..6abb8ff 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
@@ -35,6 +39,13 @@ class FlinkAssignContext<InputT, W extends BoundedWindow>
 
   FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
     fn.super();
+    checkArgument(
+        Iterables.size(value.getWindows()) == 1,
+        String.format(
+            "%s passed to window assignment must be in a single window, but it was in %s:
%s",
+            WindowedValue.class.getSimpleName(),
+            Iterables.size(value.getWindows()),
+            value.getWindows()));
     this.value = value;
   }
 
@@ -49,8 +60,8 @@ class FlinkAssignContext<InputT, W extends BoundedWindow>
   }
 
   @Override
-  public Collection<? extends BoundedWindow> windows() {
-    return value.getWindows();
+  public BoundedWindow window() {
+    return Iterables.getOnlyElement(value.getWindows());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index 892f7a1..d49821b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -65,7 +65,7 @@ class FlinkNoElementAssignContext<InputT, W extends BoundedWindow>
   }
 
   @Override
-  public Collection<? extends BoundedWindow> windows() {
-    throw new UnsupportedOperationException("No windows available.");
+  public BoundedWindow window() {
+    throw new UnsupportedOperationException("No window available.");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 3c37aa9..f68a519 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -245,9 +245,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
           }
 
           @Override
-          public Collection<? extends BoundedWindow> windows() {
+          public BoundedWindow window() {
             throw new UnsupportedOperationException(
-                "WindowFn attempted to access input windows when none were available");
+                "WindowFn attempted to access input window when none was available");
           }
         });
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
index 3e5a17d..207fb5a 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowTest extends StreamingMultipleProgramsTestBase
{
           }
 
           @Override
-          public Collection<? extends BoundedWindow> windows() {
+          public BoundedWindow window() {
             throw new UnsupportedOperationException(
                 "WindowFn attempted to access input windows when none were available");
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index a4130df..127721a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -115,8 +116,8 @@ public class WindowFnTestUtils {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return null;
+    public BoundedWindow window() {
+      return GlobalWindow.INSTANCE;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index 499ffeb..002bf2e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -53,11 +53,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow>
{
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
   public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) {
     return inputTimestamp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
index b0dd8b9..da2f38c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
@@ -51,11 +51,6 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
   public Instant getOutputTime(Instant inputTimestamp, W window) {
     return inputTimestamp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 41833f8..d84866b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -65,10 +65,10 @@ public abstract class WindowFn<T, W extends BoundedWindow>
     public abstract Instant timestamp();
 
     /**
-     * Returns the windows the current element was in, prior to this
+     * Returns the window of the current element prior to this
      * {@code WindowFn} being called.
      */
-    public abstract Collection<? extends BoundedWindow> windows();
+    public abstract BoundedWindow window();
   }
 
   /**
@@ -161,13 +161,6 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   }
 
   /**
-   * Returns true if this {@code WindowFn} assigns each element to a single window.
-   */
-  public boolean assignsToSingleWindow() {
-    return false;
-  }
-
-  /**
    * {@inheritDoc}
    *
    * <p>By default, does not register any display data. Implementors may override this
method

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index 5a01c28..ab40678 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -62,8 +62,7 @@ public class GatherAllPanes<T>
         .apply(
             Window.into(
                     new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
-                        originalWindowFn.windowCoder(),
-                        input.getWindowingStrategy().getWindowFn().assignsToSingleWindow()))
+                        originalWindowFn.windowCoder()))
                 .triggering(Never.ever()))
         // all values have the same key so they all appear as a single output element
         .apply(GroupByKey.<Void, WindowedValue<T>>create())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index 91e5609..a3477e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
 
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * A {@link WindowFn} that leaves all associations between elements and windows unchanged.
@@ -55,25 +56,21 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow>
{
    * these windows.
    */
   private final Coder<BoundedWindow> coder;
-  private final boolean assignsToSingleWindow;
 
-  public IdentityWindowFn(Coder<? extends BoundedWindow> coder, boolean assignsToSingleWindow)
{
+  public IdentityWindowFn(Coder<? extends BoundedWindow> coder) {
     // Safe because it is only used privately here.
     // At every point where a window is returned or accepted, it has been provided
-    // by priorWindowFn, so it is of the expected type.
+    // by the prior WindowFn, so it is of the expected type.
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) coder;
     this.coder = windowCoder;
-    this.assignsToSingleWindow = assignsToSingleWindow;
   }
 
   @Override
   public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext
c)
       throws Exception {
-    // The windows are provided by priorWindowFn, which also provides the coder for them
-    @SuppressWarnings("unchecked")
-    Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows();
-    return priorWindows;
+    // The window is provided by the prior WindowFn, which also provides the coder for them
+    return Collections.singleton(c.window());
   }
 
   @Override
@@ -88,17 +85,12 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow>
{
 
   @Override
   public Coder<BoundedWindow> windowCoder() {
-    // Safe because the previous WindowFn provides both the windows and the coder.
+    // Safe because the prior WindowFn provides both the windows and the coder.
     // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
     return coder;
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return assignsToSingleWindow;
-  }
-
-  @Override
   public BoundedWindow getSideInputWindow(BoundedWindow window) {
     throw new UnsupportedOperationException(
         String.format(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 5c91326..c0d159b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -58,8 +58,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K,
V>>, PCollecti
     Window.Bound<KV<K, V>> rewindow =
         Window.<KV<K, V>>into(
                 new IdentityWindowFn<>(
-                    originalStrategy.getWindowFn().windowCoder(),
-                    originalStrategy.getWindowFn().assignsToSingleWindow()))
+                    originalStrategy.getWindowFn().windowCoder()))
             .triggering(new ReshuffleTrigger<>())
             .discardingFiredPanes()
             .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index db6f425..705003e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * A {@link WindowFn} for use during tests that returns the input window for calls to
@@ -33,7 +34,7 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer,
Bound
   @Override
   public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext
c)
       throws Exception {
-    return (Collection<BoundedWindow>) c.windows();
+    return Collections.singleton(c.window());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
index 84699d6..4750af1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
@@ -39,7 +40,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -87,8 +87,8 @@ public class MergingActiveWindowSetTest {
         }
 
         @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return ImmutableList.of();
+        public BoundedWindow window() {
+          return GlobalWindow.INSTANCE;
         }
       };
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
index a1e376e..c495712 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -245,7 +245,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
         InputT value = input.getValue();
         Instant timestamp = input.getTimestamp();
         Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>(
-            windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE)));
+            windowFn, value, timestamp, GlobalWindow.INSTANCE));
 
         for (W window : assignedWindows) {
           activeWindows.addActiveForTesting(window);
@@ -401,14 +401,14 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
       extends WindowFn<Object, W>.AssignContext {
     private Object element;
     private Instant timestamp;
-    private Collection<? extends BoundedWindow> windows;
+    private BoundedWindow window;
 
-    public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant
timestamp,
-        Collection<? extends BoundedWindow> windows) {
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow
window) {
       windowFn.super();
       this.element = element;
       this.timestamp = timestamp;
-      this.windows = windows;
+      this.window = window;
     }
 
     @Override
@@ -422,8 +422,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return windows;
+    public BoundedWindow window() {
+      return window;
     }
   }
 


Mime
View raw message