beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Do not Reassign Windows when WindowFn is null
Date Thu, 02 Mar 2017 01:51:31 GMT
Do not Reassign Windows when WindowFn is null

Adjusting the Windowing Strategy should not change any elements of the
data. This is also potentially type-unsafe, as the upstream WindowFn may
only take elements of a type which is not the input element of the
downstream PTransform.

Introduce Window.Assign, which replaces Window.Bound as the primitive to
"assign elements to windows based on the WindowFn". This converts
Window.Bound into a composite in all cases.

Use a Flatten to improve performance on many runners, without needing an
opaque DoFn.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf9b9b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf9b9b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf9b9b3

Branch: refs/heads/master
Commit: eaf9b9b36dec1cc421335b27f225663ce42d0cca
Parents: ca678d8
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Feb 24 11:29:42 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Mar 1 17:51:19 2017 -0800

----------------------------------------------------------------------
 .../translation/ApexPipelineTranslator.java     |   2 +-
 .../translation/WindowAssignTranslator.java     |  78 +++++++++++
 .../apex/translation/WindowBoundTranslator.java |  78 -----------
 .../direct/TransformEvaluatorRegistry.java      |   2 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  11 +-
 .../direct/WindowEvaluatorFactoryTest.java      |  46 +------
 .../flink/FlinkBatchTransformTranslators.java   |   8 +-
 .../FlinkStreamingTransformTranslators.java     |   8 +-
 .../functions/FlinkAssignWindows.java           |   2 +-
 .../dataflow/DataflowPipelineTranslator.java    |   9 +-
 .../spark/translation/TransformTranslator.java  |   8 +-
 .../spark/translation/TranslationUtils.java     |   4 +-
 .../streaming/StreamingTransformTranslator.java |   8 +-
 .../beam/sdk/transforms/windowing/Window.java   |  43 +++++-
 .../sdk/transforms/windowing/WindowTest.java    | 136 +++++++++++++++++++
 15 files changed, 290 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index e9d6571..951a286 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -71,7 +71,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor
{
         new CreateApexPCollectionViewTranslator());
     registerTransformTranslator(CreatePCollectionView.class,
         new CreatePCollectionViewTranslator());
-    registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
+    registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
   }
 
   public ApexPipelineTranslator(ApexPipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..b3aef8d
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import java.util.Collections;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps an {@link
+ * AssignWindowsDoFn}.
+ */
+class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>>
{
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Window.Assign<T> transform, TranslationContext context) {
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+    PCollection<T> input = (PCollection<T>) context.getInput();
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, BoundedWindow> windowingStrategy =
+        (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
+
+    OldDoFn<T, T> fn =
+        (transform.getWindowFn() == null)
+            ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
+            : new AssignWindowsDoFn<>(transform.getWindowFn());
+
+    ApexParDoOperator<T, T> operator =
+        new ApexParDoOperator<T, T>(
+            context.getPipelineOptions().as(ApexPipelineOptions.class),
+            fn,
+            new TupleTag<T>(),
+            TupleTagList.empty().getAll(),
+            windowingStrategy,
+            Collections.<PCollectionView<?>>emptyList(),
+            WindowedValue.getFullCoder(
+                input.getCoder(), windowingStrategy.getWindowFn().windowCoder()),
+            context.<Void>stateInternalsFactory());
+    context.addOperator(operator, operator.output);
+    context.addStream(context.getInput(), operator.input);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
deleted file mode 100644
index a241cad..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translation;
-
-import java.util.Collections;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
-import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.DoFnAdapters;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-/**
- * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps an {@link
- * AssignWindowsDoFn}.
- */
-class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>>
{
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public void translate(Window.Bound<T> transform, TranslationContext context) {
-    PCollection<T> output = (PCollection<T>) context.getOutput();
-    PCollection<T> input = (PCollection<T>) context.getInput();
-    @SuppressWarnings("unchecked")
-    WindowingStrategy<T, BoundedWindow> windowingStrategy =
-        (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
-
-    OldDoFn<T, T> fn =
-        (transform.getWindowFn() == null)
-            ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
-            : new AssignWindowsDoFn<>(transform.getWindowFn());
-
-    ApexParDoOperator<T, T> operator =
-        new ApexParDoOperator<T, T>(
-            context.getPipelineOptions().as(ApexPipelineOptions.class),
-            fn,
-            new TupleTag<T>(),
-            TupleTagList.empty().getAll(),
-            windowingStrategy,
-            Collections.<PCollectionView<?>>emptyList(),
-            WindowedValue.getFullCoder(
-                input.getCoder(), windowingStrategy.getWindowFn().windowCoder()),
-            context.<Void>stateInternalsFactory());
-    context.addOperator(operator, operator.output);
-    context.addStream(context.getInput(), operator.input);
-  }
-
-  private static class IdentityFn<T> extends DoFn<T, T> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 9fdefc3..62fee53 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -55,7 +55,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
             .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
-            .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
+            .put(Window.Assign.class, new WindowEvaluatorFactory(ctxt))
             // Runner-specific primitives used in expansion of GroupByKey
             .put(DirectGroupByKeyOnly.class, new GroupByKeyOnlyEvaluatorFactory(ctxt))
             .put(DirectGroupAlsoByWindow.class, new GroupAlsoByWindowEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 3cf178c..8974c67 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -34,7 +33,7 @@ import org.joda.time.Instant;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link Bound Window.Bound} primitive {@link PTransform}.
+ * {@link Window.Assign} primitive {@link PTransform}.
  */
 class WindowEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
@@ -53,7 +52,8 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
   }
 
   private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
-      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
transform) {
+      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>>
+          transform) {
     WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
     UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(
@@ -68,14 +68,15 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory {
   public void cleanup() {}
 
   private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT>
{
-    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>,
Window.Bound<InputT>>
+    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>,
Window.Assign<InputT>>
         transform;
     private final WindowFn<InputT, ?> windowFn;
     private final UncommittedBundle<InputT> outputBundle;
 
     @SuppressWarnings("unchecked")
     public WindowIntoEvaluator(
-        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
transform,
+        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Assign<InputT>>
+            transform,
         WindowFn<? super InputT, ?> windowFn,
         UncommittedBundle<InputT> outputBundle) {
       this.outputBundle = outputBundle;

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 7e6eb2f..ca52852 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -33,11 +33,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -114,30 +110,6 @@ public class WindowEvaluatorFactoryTest {
   }
 
   @Test
-  public void nullWindowFunSucceeds() throws Exception {
-    Bound<Long> transform =
-        Window.<Long>triggering(
-                AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))
-            .accumulatingFiredPanes();
-    PCollection<Long> triggering = input.apply(transform);
-
-    CommittedBundle<Long> inputBundle = createInputBundle();
-
-    UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle);
-
-    TransformResult<Long> result = runEvaluator(triggering, inputBundle, transform);
-
-    assertThat(
-        Iterables.getOnlyElement(result.getOutputBundles()),
-        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
-    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
-    assertThat(
-        committed.getElements(),
-        containsInAnyOrder(
-            valueInIntervalWindow, valueInGlobalWindow, valueInGlobalAndTwoIntervalWindows));
-  }
-
-  @Test
   public void singleWindowFnSucceeds() throws Exception {
     Duration windowDuration = Duration.standardDays(7);
     Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
@@ -150,7 +122,7 @@ public class WindowEvaluatorFactoryTest {
     BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
     BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -185,7 +157,7 @@ public class WindowEvaluatorFactoryTest {
     CommittedBundle<Long> inputBundle = createInputBundle();
     UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -242,7 +214,7 @@ public class WindowEvaluatorFactoryTest {
     CommittedBundle<Long> inputBundle = createInputBundle();
     UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle, transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -307,17 +279,9 @@ public class WindowEvaluatorFactoryTest {
   }
 
   private TransformResult<Long> runEvaluator(
-      PCollection<Long> windowed,
-      CommittedBundle<Long> inputBundle,
-      Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite
*/)
-      throws Exception {
+      PCollection<Long> windowed, CommittedBundle<Long> inputBundle) throws Exception
{
     TransformEvaluator<Long> evaluator =
-        factory.forApplication(
-            AppliedPTransform
-                .<PCollection<Long>, PCollection<Long>,
-                    PTransform<PCollection<Long>, PCollection<Long>>>
-                    of("Window", input.expand(), windowed.expand(), windowTransform, p),
-            inputBundle);
+        factory.forApplication(DirectGraphs.getProducer(windowed), inputBundle);
 
     evaluator.processElement(valueInGlobalWindow);
     evaluator.processElement(valueInGlobalAndTwoIntervalWindows);

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index acc204d..f043c90 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -110,7 +110,7 @@ class FlinkBatchTransformTranslators {
 
     TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
 
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
     TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch());
@@ -145,11 +145,11 @@ class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class WindowBoundTranslatorBatch<T>
-      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>>
{
+  private static class WindowAssignTranslatorBatch<T>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>>
{
 
     @Override
-    public void translateNode(Window.Bound<T> transform, FlinkBatchTranslationContext
context) {
+    public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext
context) {
       PValue input = context.getInput(transform);
 
       TypeInformation<WindowedValue<T>> resultTypeInfo =

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 03f567d..c7df91d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -124,7 +124,7 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
     TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
 
-    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
     TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
     TRANSLATORS.put(
         FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
@@ -702,12 +702,12 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static class WindowBoundTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>>
{
+  private static class WindowAssignTranslator<T>
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>>
{
 
     @Override
     public void translateNode(
-        Window.Bound<T> transform,
+        Window.Assign<T> transform,
         FlinkStreamingTranslationContext context) {
 
       @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
index f241ad0..c3a5095 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
 
 /**
  * Flink {@link FlatMapFunction} for implementing
- * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound}.
+ * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
  */
 public class FlinkAssignWindows<T, W extends BoundedWindow>
     implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index fe5db5a..7e559e9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -85,7 +85,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
@@ -878,14 +877,14 @@ public class DataflowPipelineTranslator {
         });
 
     registerTransformTranslator(
-        Window.Bound.class,
-        new TransformTranslator<Bound>() {
+        Window.Assign.class,
+        new TransformTranslator<Window.Assign>() {
           @Override
-          public void translate(Window.Bound transform, TranslationContext context) {
+          public void translate(Window.Assign transform, TranslationContext context) {
             translateHelper(transform, context);
           }
 
-          private <T> void translateHelper(Window.Bound<T> transform, TranslationContext
context) {
+          private <T> void translateHelper(Window.Assign<T> transform, TranslationContext
context) {
             StepTranslationContext stepContext = context.addStep(transform, "Bucket");
             PCollection<T> input = context.getInput(transform);
             stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7fc09ad..8ebb496 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -611,10 +611,10 @@ public final class TransformTranslator {
     rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, conf);
   }
 
-  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>>
window() {
-    return new TransformEvaluator<Window.Bound<T>>() {
+  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>>
window() {
+    return new TransformEvaluator<Window.Assign<T>>() {
       @Override
-      public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
+      public void evaluate(Window.Assign<T> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<T>> inRDD =
             ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD();
@@ -734,7 +734,7 @@ public final class TransformTranslator {
     EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
     EVALUATORS.put(View.AsIterable.class, viewAsIter());
     EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
-    EVALUATORS.put(Window.Bound.class, window());
+    EVALUATORS.put(Window.Assign.class, window());
     // mostly test evaluators
     EVALUATORS.put(StorageLevelPTransform.class, storageLevel());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 6b27436..158593e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -101,14 +101,14 @@ public final class TranslationUtils {
    * with triggering or allowed lateness).
    * </p>
    *
-   * @param transform The {@link Window.Bound} transformation.
+   * @param transform The {@link Window.Assign} transformation.
    * @param context   The {@link EvaluationContext}.
    * @param <T>       PCollection type.
    * @param <W>       {@link BoundedWindow} type.
    * @return if to apply the transformation.
    */
   public static <T, W extends BoundedWindow> boolean
-  skipAssignWindows(Window.Bound<T> transform, EvaluationContext context) {
+  skipAssignWindows(Window.Assign<T> transform, EvaluationContext context) {
     @SuppressWarnings("unchecked")
     WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) transform.getWindowFn();
     return windowFn == null

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index a856897..e3445bf 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -210,10 +210,10 @@ final class StreamingTransformTranslator {
     };
   }
 
-  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Bound<T>>
window() {
-    return new TransformEvaluator<Window.Bound<T>>() {
+  private static <T, W extends BoundedWindow> TransformEvaluator<Window.Assign<T>>
window() {
+    return new TransformEvaluator<Window.Assign<T>>() {
       @Override
-      public void evaluate(final Window.Bound<T> transform, EvaluationContext context)
{
+      public void evaluate(final Window.Assign<T> transform, EvaluationContext context)
{
         @SuppressWarnings("unchecked")
         UnboundedDataset<T> unboundedDataset =
             ((UnboundedDataset<T>) context.borrowDataset(transform));
@@ -444,7 +444,7 @@ final class StreamingTransformTranslator {
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
     EVALUATORS.put(CreateStream.class, createFromQueue());
-    EVALUATORS.put(Window.Bound.class, window());
+    EVALUATORS.put(Window.Assign.class, window());
     EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 65dfaa9..94870ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.joda.time.Duration;
 
 /**
@@ -152,7 +154,7 @@ public class Window {
      *
      * <p>This is the default behavior.
      */
-    FIRE_IF_NON_EMPTY;
+    FIRE_IF_NON_EMPTY
 
   }
 
@@ -469,8 +471,16 @@ public class Window {
     public PCollection<T> expand(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
           getOutputStrategyInternal(input.getWindowingStrategy());
-      return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), outputStrategy, input.isBounded());
+      if (windowFn == null) {
+        // A new PCollection must be created in case input is reused in a different location
as the
+        // two PCollections will, in general, have a different windowing strategy.
+        return PCollectionList.of(input)
+            .apply(Flatten.<T>pCollections())
+            .setWindowingStrategyInternal(outputStrategy);
+      } else {
+        // This is the AssignWindows primitive
+        return input.apply(new Assign<T>(outputStrategy));
+      }
     }
 
     @Override
@@ -522,6 +532,33 @@ public class Window {
     }
   }
 
+
+  /**
+   * A Primitive {@link PTransform} that assigns windows to elements based on a {@link WindowFn}.
+   */
+  public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>>
{
+    private final WindowingStrategy<T, ?> updatedStrategy;
+
+    /**
+     * Create a new {@link Assign} where the output is windowed with the updated {@link
+     * WindowingStrategy}. Windows should be assigned using the {@link WindowFn} returned
by
+     * {@link #getWindowFn()}.
+     */
+    private Assign(WindowingStrategy updatedStrategy) {
+      this.updatedStrategy = updatedStrategy;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), updatedStrategy, input.isBounded());
+    }
+
+    public WindowFn<T, ?> getWindowFn() {
+      return updatedStrategy.getWindowFn();
+    }
+  }
+
   /**
    * Creates a {@code Window} {@code PTransform} that does not change assigned
    * windows, but will cause windows to be merged again as part of the next

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 55c7297..1101ebc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.not;
@@ -30,15 +31,24 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
@@ -153,6 +163,51 @@ public class WindowTest implements Serializable {
     assertEquals(fixed25, strategy.getWindowFn());
   }
 
+  /**
+   * With {@link #testWindowIntoNullWindowFnNoAssign()}, demonstrates that the expansions
of the
+   * {@link Window.Bound} transform depends on if it actually assigns elements to windows.
+   */
+  @Test
+  public void testWindowIntoWindowFnAssign() {
+    pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(
+            Window.<Integer>into(
+                FixedWindows.of(Duration.standardMinutes(11L).plus(Duration.millis(1L)))));
+
+    final AtomicBoolean foundAssign = new AtomicBoolean(false);
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+            if (node.getTransform() instanceof Window.Assign) {
+              foundAssign.set(true);
+            }
+          }
+        });
+    assertThat(foundAssign.get(), is(true));
+  }
+
+  /**
+   * With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the
+   * {@link Window.Bound} transform depends on if it actually assigns elements to windows.
+   */
+  @Test
+  public void testWindowIntoNullWindowFnNoAssign() {
+    pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(
+            Window.<Integer>triggering(AfterWatermark.pastEndOfWindow())
+                .withAllowedLateness(Duration.ZERO)
+                .accumulatingFiredPanes());
+
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+            assertThat(node.getTransform(), not(instanceOf(Window.Assign.class)));
+          }
+        });
+  }
+
   @Test
   public void testWindowGetName() {
     assertEquals("Window.Into()",
@@ -220,6 +275,87 @@ public class WindowTest implements Serializable {
       .apply("Trigger", Window.<String>triggering(trigger));
   }
 
+  private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow>
{
+    private static final IntervalWindow EVEN_WINDOW =
+        new IntervalWindow(
+            BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp());
+    private static final IntervalWindow ODD_WINDOW =
+        new IntervalWindow(
+            BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1));
+
+    @Override
+    public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception
{
+      if (c.element() % 2 == 0) {
+        return Collections.singleton(EVEN_WINDOW);
+      }
+      return Collections.singleton(ODD_WINDOW);
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      return other instanceof WindowOddEvenBuckets;
+    }
+
+    @Override
+    public Coder<IntervalWindow> windowCoder() {
+      return new IntervalWindow.IntervalWindowCoder();
+    }
+
+    @Override
+    public IntervalWindow getSideInputWindow(BoundedWindow window) {
+      throw new UnsupportedOperationException(
+          String.format("Can't use %s for side inputs", getClass().getSimpleName()));
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testNoWindowFnDoesNotReassignWindows() {
+    pipeline.enableAbandonedNodeEnforcement(true);
+
+    final PCollection<Long> initialWindows =
+        pipeline
+            .apply(CountingInput.upTo(10L))
+            .apply("AssignWindows", Window.into(new WindowOddEvenBuckets()));
+
+    // Sanity check the window assignment to demonstrate the baseline
+    PAssert.that(initialWindows)
+        .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
+        .containsInAnyOrder(0L, 2L, 4L, 6L, 8L);
+    PAssert.that(initialWindows)
+        .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
+        .containsInAnyOrder(1L, 3L, 5L, 7L, 9L);
+
+    PCollection<Boolean> upOne =
+        initialWindows.apply(
+            "ModifyTypes",
+            MapElements.<Long, Boolean>via(
+                new SimpleFunction<Long, Boolean>() {
+                  @Override
+                  public Boolean apply(Long input) {
+                    return input % 2 == 0;
+                  }
+                }));
+    PAssert.that(upOne)
+        .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
+        .containsInAnyOrder(true, true, true, true, true);
+    PAssert.that(upOne)
+        .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
+        .containsInAnyOrder(false, false, false, false, false);
+
+    // The elements should be in the same windows, even though they would not be assigned
to the
+    // same windows with the updated timestamps. If we try to apply the original WindowFn,
the type
+    // will not be appropriate and the runner should crash, as a Boolean cannot be converted
into
+    // a long.
+    PCollection<Boolean> updatedTrigger =
+        upOne.apply(
+            "UpdateWindowingStrategy",
+            Window.<Boolean>triggering(Never.ever())
+                .withAllowedLateness(Duration.ZERO)
+                .accumulatingFiredPanes());
+    pipeline.run();
+  }
+
   /**
    * Tests that when two elements are combined via a GroupByKey their output timestamp agrees
    * with the windowing function default, the end of the window.


Mime
View raw message