beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [03/10] incubator-beam git commit: Remove ParDo.of(OldDoFn) from Apex runner
Date Thu, 15 Dec 2016 22:29:03 GMT
Remove ParDo.of(OldDoFn) from Apex runner

The only such usage was of AssignWindowsDoFn. Now, instead, it is
instantiated directly using a new translator for Window.Bound.

This change also separates the overloads of ApexParDoOperator for old
and new DoFn, to make the OldDoFn overload easier to track and later
remove.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af616d97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af616d97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af616d97

Branch: refs/heads/master
Commit: af616d9741b19d0a7705df6fe075be1509aa659f
Parents: f5f329e
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Dec 9 16:31:42 2016 -0800
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Thu Dec 15 13:55:24 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 46 +-----------
 .../translation/ApexPipelineTranslator.java     |  2 +
 .../translation/ParDoBoundMultiTranslator.java  |  4 +-
 .../apex/translation/ParDoBoundTranslator.java  |  4 +-
 .../apex/translation/WindowBoundTranslator.java | 78 ++++++++++++++++++++
 .../operators/ApexParDoOperator.java            | 25 ++++++-
 .../translation/ParDoBoundTranslatorTest.java   |  3 +-
 .../apache/beam/runners/core/AssignWindows.java | 46 ------------
 .../spark/translation/SparkAssignWindowFn.java  |  2 +-
 9 files changed, 108 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index e5bde46..f12ebef 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -32,7 +32,6 @@ import org.apache.apex.api.Launcher;
 import org.apache.apex.api.Launcher.AppHandle;
 import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
-import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -46,9 +45,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
@@ -90,10 +86,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult>
{
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
 
-    if (Window.Bound.class.equals(transform.getClass())) {
-      return (OutputT) ((PCollection) input).apply(
-          new AssignWindowsAndSetStrategy((Window.Bound) transform));
-    } else if (Create.Values.class.equals(transform.getClass())) {
+    if (Create.Values.class.equals(transform.getClass())) {
       return (OutputT) PCollection
           .<OutputT>createPrimitiveOutputInternal(
               input.getPipeline(),
@@ -162,43 +155,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult>
{
 
   }
 
-  /**
-   * copied from DirectPipelineRunner.
-   * used to replace Window.Bound till equivalent function is added in Apex
-   */
-  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
-      extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private final Window.Bound<T> wrapped;
-
-    public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
-      this.wrapped = wrapped;
-    }
-
-    @Override
-    public PCollection<T> expand(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
-      WindowFn<T, BoundedWindow> windowFn =
-          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
-      // If the Window.Bound transform only changed parts other than the WindowFn, then
-      // we skip AssignWindows even though it should be harmless in a perfect world.
-      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
-      // crash if another GBK is performed without explicitly setting the WindowFn. So we
skip
-      // AssignWindows in this case.
-      if (wrapped.getWindowFn() == null) {
-        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
-            .setWindowingStrategyInternal(outputStrategy);
-      } else {
-        return input
-            .apply("AssignWindows", new AssignWindows<>(windowFn))
-            .setWindowingStrategyInternal(outputStrategy);
-      }
-    }
-  }
-
   private static class IdentityFn<T> extends DoFn<T, T> {
     private static final long serialVersionUID = 1L;
     @ProcessElement

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 8d6db84..c8e0290 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
@@ -70,6 +71,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor
{
         new CreateApexPCollectionViewTranslator());
     registerTransformTranslator(CreatePCollectionView.class,
         new CreatePCollectionViewTranslator());
+    registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
   }
 
   public ApexPipelineTranslator(ApexPipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 706482a..574ce8f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -31,7 +31,6 @@ import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -77,7 +76,6 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
               ApexRunner.class.getSimpleName()));
     }
 
-    OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
@@ -89,7 +87,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     ApexParDoOperator<InputT, OutputT> operator =
         new ApexParDoOperator<>(
             context.getPipelineOptions(),
-            oldDoFn,
+            doFn,
             transform.getMainOutputTag(),
             transform.getSideOutputTags().getAll(),
             context.<PCollection<?>>getInput().getWindowingStrategy(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index b5a50f6..de78628 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -23,7 +23,6 @@ import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
@@ -64,7 +63,6 @@ class ParDoBoundTranslator<InputT, OutputT>
               ApexRunner.class.getSimpleName()));
     }
 
-    OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
     PCollection<OutputT> output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
@@ -76,7 +74,7 @@ class ParDoBoundTranslator<InputT, OutputT>
     ApexParDoOperator<InputT, OutputT> operator =
         new ApexParDoOperator<>(
             context.getPipelineOptions(),
-            oldDoFn,
+            doFn,
             new TupleTag<OutputT>(),
             TupleTagList.empty().getAll() /*sideOutputTags*/,
             output.getWindowingStrategy(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
new file mode 100644
index 0000000..33b9269
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import java.util.Collections;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps an {@link
+ * AssignWindowsDoFn}.
+ */
+class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>>
{
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Window.Bound<T> transform, TranslationContext context) {
+    PCollection<T> output = context.getOutput();
+    PCollection<T> input = context.getInput();
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, BoundedWindow> windowingStrategy =
+        (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
+
+    OldDoFn<T, T> fn =
+        (transform.getWindowFn() == null)
+            ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
+            : new AssignWindowsDoFn<>(transform.getWindowFn());
+
+    ApexParDoOperator<T, T> operator =
+        new ApexParDoOperator<T, T>(
+            context.getPipelineOptions().as(ApexPipelineOptions.class),
+            fn,
+            new TupleTag<T>(),
+            TupleTagList.empty().getAll(),
+            windowingStrategy,
+            Collections.<PCollectionView<?>>emptyList(),
+            WindowedValue.getFullCoder(
+                input.getCoder(), windowingStrategy.getWindowFn().windowCoder()),
+            context.<Void>stateInternalsFactory());
+    context.addOperator(operator, operator.output);
+    context.addStream(context.getInput(), operator.input);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 637c3ff..08f062d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -28,11 +28,9 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
@@ -50,6 +48,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;
@@ -95,6 +94,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator
implements
   private transient Map<TupleTag<?>, DefaultOutputPort<ApexStreamTuple<?>>>
sideOutputPortMapping =
       Maps.newHashMapWithExpectedSize(5);
 
+  @Deprecated
   public ApexParDoOperator(
       ApexPipelineOptions pipelineOptions,
       OldDoFn<InputT, OutputT> doFn,
@@ -125,6 +125,27 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator
implements
 
   }
 
+  public ApexParDoOperator(
+      ApexPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      TupleTag<OutputT> mainOutputTag,
+      List<TupleTag<?>> sideOutputTags,
+      WindowingStrategy<?, ?> windowingStrategy,
+      List<PCollectionView<?>> sideInputs,
+      Coder<WindowedValue<InputT>> inputCoder,
+      StateInternalsFactory<Void> stateInternalsFactory
+      ) {
+    this(
+        pipelineOptions,
+        DoFnAdapters.toOldDoFn(doFn),
+        mainOutputTag,
+        sideOutputTags,
+        windowingStrategy,
+        sideInputs,
+        inputCoder,
+        stateInternalsFactory);
+  }
+
   @SuppressWarnings("unused") // for Kryo
   private ApexParDoOperator() {
     this.pipelineOptions = null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index 28b2ec9..fa94b2a 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
@@ -208,7 +207,7 @@ public class ParDoBoundTranslatorTest {
     ApexParDoOperator<Integer, Integer> operator =
         new ApexParDoOperator<>(
             options,
-            DoFnAdapters.toOldDoFn(new Add(singletonView)),
+            new Add(singletonView),
             new TupleTag<Integer>(),
             TupleTagList.empty().getAll(),
             WindowingStrategy.globalDefault(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
deleted file mode 100644
index 375932a..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindows.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a
- * {@link PCollection} to windows according to the provided {@link WindowFn}.
- *
- * @param <T> Type of elements being windowed
- * @param <W> Window type
- */
-public class AssignWindows<T, W extends BoundedWindow>
-    extends PTransform<PCollection<T>, PCollection<T>> {
-
-  private WindowFn<? super T, W> fn;
-
-  public AssignWindows(WindowFn<? super T, W> fn) {
-    this.fn = fn;
-  }
-
-  @Override
-  public PCollection<T> expand(PCollection<T> input) {
-    return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af616d97/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
index 9d7ed7d..18a3dd8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
 
 
 /**
- * An implementation of {@link org.apache.beam.runners.core.AssignWindows} for the Spark
runner.
+ * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark
runner.
  */
 public class SparkAssignWindowFn<T, W extends BoundedWindow>
     implements Function<WindowedValue<T>, WindowedValue<T>> {


Mime
View raw message