beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] beam git commit: [BEAM-1560] Use provided Function Runners in Flink Batch Runner
Date Mon, 27 Feb 2017 12:23:52 GMT
Repository: beam
Updated Branches:
  refs/heads/master 3082178b3 -> b261d4890


[BEAM-1560] Use provided Function Runners in Flink Batch Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ae2a385
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ae2a385
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ae2a385

Branch: refs/heads/master
Commit: 0ae2a3851cbfe1ec2f2c7237954b18c9951c76a3
Parents: 3082178
Author: JingsongLi <lzljs3620320@aliyun.com>
Authored: Mon Feb 27 18:08:34 2017 +0800
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 27 13:22:42 2017 +0100

----------------------------------------------------------------------
 .../flink/FlinkBatchTransformTranslators.java   |   9 +-
 .../runners/flink/OldPerKeyCombineFnRunner.java |  62 -----
 .../flink/OldPerKeyCombineFnRunners.java        | 155 -----------
 .../functions/FlinkAggregatorFactory.java       |  53 ++++
 .../functions/FlinkDoFnFunction.java            |  96 +++----
 .../FlinkMergingNonShuffleReduceFunction.java   |  65 ++---
 .../FlinkMergingPartialReduceFunction.java      |  45 ++--
 .../functions/FlinkMergingReduceFunction.java   |  39 ++-
 .../functions/FlinkMultiOutputDoFnFunction.java | 101 +++----
 .../FlinkMultiOutputProcessContext.java         | 118 --------
 .../functions/FlinkNoElementAssignContext.java  |  68 -----
 .../functions/FlinkNoOpStepContext.java         |  73 +++++
 .../functions/FlinkPartialReduceFunction.java   |  53 ++--
 .../functions/FlinkProcessContextBase.java      | 267 -------------------
 .../functions/FlinkReduceFunction.java          |  49 ++--
 .../functions/FlinkSideInputReader.java         |  80 ++++++
 .../FlinkSingleOutputProcessContext.java        |  69 -----
 17 files changed, 419 insertions(+), 983 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index de8b43f..99651c3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -396,7 +396,7 @@ class FlinkBatchTransformTranslators {
           inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));
 
       // construct a map from side input to WindowingStrategy so that
-      // the OldDoFn runner can map main-input windows to side input windows
+      // the DoFn runner can map main-input windows to side input windows
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
       for (PCollectionView<?> sideInput: transform.getSideInputs()) {
         sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
@@ -544,7 +544,7 @@ class FlinkBatchTransformTranslators {
       List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
       // construct a map from side input to WindowingStrategy so that
-      // the OldDoFn runner can map main-input windows to side input windows
+      // the DoFn runner can map main-input windows to side input windows
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
       for (PCollectionView<?> sideInput: sideInputs) {
         sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
@@ -627,7 +627,7 @@ class FlinkBatchTransformTranslators {
       List<PCollectionView<?>> sideInputs = transform.getSideInputs();
 
       // construct a map from side input to WindowingStrategy so that
-      // the OldDoFn runner can map main-input windows to side input windows
+      // the DoFn runner can map main-input windows to side input windows
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>();
       for (PCollectionView<?> sideInput: sideInputs) {
         sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
@@ -640,7 +640,8 @@ class FlinkBatchTransformTranslators {
               windowingStrategy,
               sideInputStrategies,
               context.getPipelineOptions(),
-              outputMap);
+              outputMap,
+              transform.getMainOutputTag());
 
       MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet =
           new MapPartitionOperator<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
deleted file mode 100644
index 71c3aa4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import java.io.Serializable;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-
-/**
- * An interface that runs a {@link PerKeyCombineFn} with unified APIs using
- * {@link OldDoFn.ProcessContext}.
- */
-@Deprecated
-public interface OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable {
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  AccumT mergeAccumulators(
-      K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c);
-
-  /**
-   * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}.
-   *
-   * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
-   * if it is required.
-   */
-  OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
deleted file mode 100644
index 90894f2..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * Static utility methods that provide {@link OldPerKeyCombineFnRunner} implementations
- * for different keyed combine functions.
- */
-@Deprecated
-public class OldPerKeyCombineFnRunners {
-  /**
-   * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}.
-   */
-  public static <K, InputT, AccumT, OutputT> OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT>
-  create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) {
-    if (perKeyCombineFn instanceof KeyedCombineFnWithContext) {
-      return new KeyedCombineFnWithContextRunner<>(
-          (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else if (perKeyCombineFn instanceof KeyedCombineFn) {
-      return new KeyedCombineFnRunner<>(
-          (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn);
-    } else {
-      throw new IllegalStateException(
-          String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass()));
-    }
-  }
-
-  /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */
-  private static CombineWithContext.Context createFromProcessContext(
-      final OldDoFn<?, ?>.ProcessContext c) {
-    return new CombineWithContext.Context() {
-      @Override
-      public PipelineOptions getPipelineOptions() {
-        return c.getPipelineOptions();
-      }
-
-      @Override
-      public <T> T sideInput(PCollectionView<T> view) {
-        return c.sideInput(view);
-      }
-    };
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFn}.
-   */
-  private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT>
-      implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn;
-
-    private KeyedCombineFnRunner(
-        KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) {
-      this.keyedCombineFn = keyedCombineFn;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.createAccumulator(key);
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.addInput(key, accumulator, input);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.mergeAccumulators(key, accumulators);
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFn.extractOutput(key, accumulator);
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFn.toString();
-    }
-  }
-
-  /**
-   * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}.
-   *
-   * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}.
-   */
-  private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT>
-      implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> {
-    private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext;
-
-    private KeyedCombineFnWithContextRunner(
-        KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
-      this.keyedCombineFnWithContext = keyedCombineFnWithContext;
-    }
-
-    @Override
-    public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.createAccumulator(key,
-          createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT addInput(
-        K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.addInput(key, accumulator, value,
-          createFromProcessContext(c));
-    }
-
-    @Override
-    public AccumT mergeAccumulators(
-        K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.mergeAccumulators(
-          key, accumulators, createFromProcessContext(c));
-    }
-
-    @Override
-    public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
-      return keyedCombineFnWithContext.extractOutput(key, accumulator,
-          createFromProcessContext(c));
-    }
-
-    @Override
-    public String toString() {
-      return keyedCombineFnWithContext.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
new file mode 100644
index 0000000..fb2493b
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.runners.core.AggregatorFactory;
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A {@link AggregatorFactory} for the Flink Batch Runner.
+ */
+public class FlinkAggregatorFactory implements AggregatorFactory{
+
+  private final RuntimeContext runtimeContext;
+
+  public FlinkAggregatorFactory(RuntimeContext runtimeContext) {
+    this.runtimeContext = runtimeContext;
+  }
+
+  @Override
+  public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
+      Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
+      Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+    @SuppressWarnings("unchecked")
+    SerializableFnAggregatorWrapper<InputT, OutputT> result =
+        (SerializableFnAggregatorWrapper<InputT, OutputT>)
+            runtimeContext.getAccumulator(aggregatorName);
+
+    if (result == null) {
+      result = new SerializableFnAggregatorWrapper<>(combine);
+      runtimeContext.addAccumulator(aggregatorName, result);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 8b2bcc6..7081aad 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -17,50 +17,51 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import java.util.Collections;
 import java.util.Map;
-import org.apache.beam.runners.core.DoFnAdapters;
-import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
 /**
- * Encapsulates a {@link OldDoFn}
+ * Encapsulates a {@link DoFn}
  * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
  */
 public class FlinkDoFnFunction<InputT, OutputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
 
-  private final OldDoFn<InputT, OutputT> doFn;
   private final SerializedPipelineOptions serializedOptions;
 
+  private final DoFn<InputT, OutputT> doFn;
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-  private final boolean requiresWindowAccess;
-  private final boolean hasSideInputs;
-
   private final WindowingStrategy<?, ?> windowingStrategy;
 
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+
   public FlinkDoFnFunction(
       DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options) {
-    this.doFn = DoFnAdapters.toOldDoFn(doFn);
+
+    this.doFn = doFn;
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
 
-    this.requiresWindowAccess =
-        DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
-    this.hasSideInputs = !sideInputs.isEmpty();
   }
 
   @Override
@@ -68,48 +69,53 @@ public class FlinkDoFnFunction<InputT, OutputT>
       Iterable<WindowedValue<InputT>> values,
       Collector<WindowedValue<OutputT>> out) throws Exception {
 
-    FlinkSingleOutputProcessContext<InputT, OutputT> context =
-        new FlinkSingleOutputProcessContext<>(
-            serializedOptions.getPipelineOptions(),
-            getRuntimeContext(),
-            doFn,
-            windowingStrategy,
-            sideInputs,
-            out);
-
-    this.doFn.startBundle(context);
-
-    if (!requiresWindowAccess || hasSideInputs) {
-      // we don't need to explode the windows
-      for (WindowedValue<InputT> value : values) {
-        context.setWindowedValue(value);
-        doFn.processElement(context);
-      }
-    } else {
-      // we need to explode the windows because we have per-window
-      // side inputs and window access also only works if an element
-      // is in only one window
-      for (WindowedValue<InputT> value : values) {
-        for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
-          context.setWindowedValue(explodedValue);
-          doFn.processElement(context);
-        }
-      }
+    RuntimeContext runtimeContext = getRuntimeContext();
+
+    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
+        serializedOptions.getPipelineOptions(), doFn,
+        new FlinkSideInputReader(sideInputs, runtimeContext),
+        new DoFnOutputManager(out),
+        new TupleTag<OutputT>() {
+        },
+        Collections.<TupleTag<?>>emptyList(),
+        new FlinkNoOpStepContext(),
+        new FlinkAggregatorFactory(runtimeContext),
+        windowingStrategy);
+
+    doFnRunner.startBundle();
+
+    for (WindowedValue<InputT> value : values) {
+      doFnRunner.processElement(value);
     }
 
-    // set the windowed value to null so that the special logic for outputting
-    // in startBundle/finishBundle kicks in
-    context.setWindowedValue(null);
-    this.doFn.finishBundle(context);
+    doFnRunner.finishBundle();
   }
 
   @Override
   public void open(Configuration parameters) throws Exception {
-    doFn.setup();
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
   }
 
   @Override
   public void close() throws Exception {
-    doFn.teardown();
+    doFnInvoker.invokeTeardown();
+  }
+
+  private class DoFnOutputManager
+      implements DoFnRunners.OutputManager {
+
+    private Collector collector;
+
+    DoFnOutputManager(Collector<WindowedValue<OutputT>> collector) {
+      this.collector = collector;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      collector.collect(output);
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 5ec6a77..26fd0b4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,9 +24,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -52,13 +51,11 @@ import org.joda.time.Instant;
  * yet be in their correct windows for side-input access.
  */
 public class FlinkMergingNonShuffleReduceFunction<
-      K, InputT, AccumT, OutputT, W extends IntervalWindow>
+    K, InputT, AccumT, OutputT, W extends IntervalWindow>
     extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
 
   private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
 
-  private final OldDoFn<KV<K, InputT>, KV<K, OutputT>> doFn;
-
   private final WindowingStrategy<?, W> windowingStrategy;
 
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -78,13 +75,6 @@ public class FlinkMergingNonShuffleReduceFunction<
 
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
-    // dummy OldDoFn because we need one for ProcessContext
-    this.doFn = new OldDoFn<KV<K, InputT>, KV<K, OutputT>>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-
-      }
-    };
   }
 
   @Override
@@ -92,17 +82,13 @@ public class FlinkMergingNonShuffleReduceFunction<
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
-        new FlinkSingleOutputProcessContext<>(
-            serializedOptions.getPipelineOptions(),
-            getRuntimeContext(),
-            doFn,
-            windowingStrategy,
-            sideInputs, out
-        );
+    PipelineOptions options = serializedOptions.getPipelineOptions();
 
-    OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
-        OldPerKeyCombineFnRunners.create(combineFn);
+    FlinkSideInputReader sideInputReader =
+        new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+    PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =
@@ -112,8 +98,8 @@ public class FlinkMergingNonShuffleReduceFunction<
     // memory
     // this seems very unprudent, but correct, for now
     List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue: elements) {
-      for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) {
+    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
+      for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
         sortedInput.add(exploded);
       }
     }
@@ -141,9 +127,10 @@ public class FlinkMergingNonShuffleReduceFunction<
     IntervalWindow currentWindow =
         (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
     InputT firstValue = currentValue.getValue().getValue();
-    processContext.setWindowedValue(currentValue);
-    AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
+    AccumT accumulator =
+        combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows());
+    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
+        options, sideInputReader, currentValue.getWindows());
 
     // we use this to keep track of the timestamps assigned by the OutputTimeFn
     Instant windowTimestamp =
@@ -151,14 +138,15 @@ public class FlinkMergingNonShuffleReduceFunction<
 
     while (iterator.hasNext()) {
       WindowedValue<KV<K, InputT>> nextValue = iterator.next();
-      IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+      IntervalWindow nextWindow =
+          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
 
       if (currentWindow.equals(nextWindow)) {
         // continue accumulating and merge windows
 
         InputT value = nextValue.getValue().getValue();
-        processContext.setWindowedValue(nextValue);
-        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
+        accumulator = combineFnRunner.addInput(key, accumulator, value,
+            options, sideInputReader, currentValue.getWindows());
 
         windowTimestamp = outputTimeFn.combine(
             windowTimestamp,
@@ -168,24 +156,29 @@ public class FlinkMergingNonShuffleReduceFunction<
         // emit the value that we currently have
         out.collect(
             WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+                    options, sideInputReader, currentValue.getWindows())),
                 windowTimestamp,
                 currentWindow,
                 PaneInfo.NO_FIRING));
 
         currentWindow = nextWindow;
+        currentValue = nextValue;
         InputT value = nextValue.getValue().getValue();
-        processContext.setWindowedValue(nextValue);
-        accumulator = combineFnRunner.createAccumulator(key, processContext);
-        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
+        accumulator = combineFnRunner.createAccumulator(key,
+            options, sideInputReader, currentValue.getWindows());
+        accumulator = combineFnRunner.addInput(key, accumulator, value,
+            options, sideInputReader, currentValue.getWindows());
         windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
       }
+
     }
 
     // emit the final accumulator
     out.collect(
         WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+                options, sideInputReader, currentValue.getWindows())),
             windowTimestamp,
             currentWindow,
             PaneInfo.NO_FIRING));

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index cf058e8..c68f155 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -24,8 +24,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -45,7 +45,7 @@ import org.joda.time.Instant;
  * same behaviour as {@code MergeOverlappingIntervalWindows}.
  */
 public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow>
-  extends FlinkPartialReduceFunction<K, InputT, AccumT, W> {
+    extends FlinkPartialReduceFunction<K, InputT, AccumT, W> {
 
   public FlinkMergingPartialReduceFunction(
       CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
@@ -60,17 +60,13 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
 
-    FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
-        new FlinkSingleOutputProcessContext<>(
-            serializedOptions.getPipelineOptions(),
-            getRuntimeContext(),
-            doFn,
-            windowingStrategy,
-            sideInputs, out
-        );
+    PipelineOptions options = serializedOptions.getPipelineOptions();
 
-    OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        OldPerKeyCombineFnRunners.create(combineFn);
+    FlinkSideInputReader sideInputReader =
+        new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =
@@ -80,8 +76,8 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
     // memory
     // this seems very unprudent, but correct, for now
     List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue: elements) {
-      for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) {
+    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
+      for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
         sortedInput.add(exploded);
       }
     }
@@ -109,9 +105,10 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
     IntervalWindow currentWindow =
         (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
     InputT firstValue = currentValue.getValue().getValue();
-    processContext.setWindowedValue(currentValue);
-    AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
+    AccumT accumulator = combineFnRunner.createAccumulator(key,
+        options, sideInputReader, currentValue.getWindows());
+    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
+        options, sideInputReader, currentValue.getWindows());
 
     // we use this to keep track of the timestamps assigned by the OutputTimeFn
     Instant windowTimestamp =
@@ -125,8 +122,8 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
         // continue accumulating and merge windows
 
         InputT value = nextValue.getValue().getValue();
-        processContext.setWindowedValue(nextValue);
-        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
+        accumulator = combineFnRunner.addInput(key, accumulator, value,
+            options, sideInputReader, currentValue.getWindows());
 
         windowTimestamp = outputTimeFn.combine(
             windowTimestamp,
@@ -142,10 +139,12 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
                 PaneInfo.NO_FIRING));
 
         currentWindow = nextWindow;
+        currentValue = nextValue;
         InputT value = nextValue.getValue().getValue();
-        processContext.setWindowedValue(nextValue);
-        accumulator = combineFnRunner.createAccumulator(key, processContext);
-        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
+        accumulator = combineFnRunner.createAccumulator(key,
+            options, sideInputReader, currentValue.getWindows());
+        accumulator = combineFnRunner.addInput(key, accumulator, value,
+            options, sideInputReader, currentValue.getWindows());
         windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 4fa4578..84b3adc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -26,8 +26,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -62,29 +62,24 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
       Iterable<WindowedValue<KV<K, AccumT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
-        new FlinkSingleOutputProcessContext<>(
-            serializedOptions.getPipelineOptions(),
-            getRuntimeContext(),
-            doFn,
-            windowingStrategy,
-            sideInputs, out
-        );
+    PipelineOptions options = serializedOptions.getPipelineOptions();
 
-    OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        OldPerKeyCombineFnRunners.create(combineFn);
+    FlinkSideInputReader sideInputReader =
+        new FlinkSideInputReader(sideInputs, getRuntimeContext());
+
+    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =
         (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
 
-
     // get all elements so that we can sort them, has to fit into
     // memory
     // this seems very unprudent, but correct, for now
     ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, AccumT>> inputValue: elements) {
-      for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) {
+    for (WindowedValue<KV<K, AccumT>> inputValue : elements) {
+      for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) {
         sortedInput.add(exploded);
       }
     }
@@ -127,25 +122,24 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
       if (nextWindow.equals(currentWindow)) {
         // continue accumulating and merge windows
 
-        processContext.setWindowedValue(nextValue);
-
         accumulator = combineFnRunner.mergeAccumulators(
-            key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext);
+            key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
+            options, sideInputReader, currentValue.getWindows());
 
         windowTimestamps.add(nextValue.getTimestamp());
       } else {
         out.collect(
             WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+                    options, sideInputReader, currentValue.getWindows())),
                 outputTimeFn.merge(currentWindow, windowTimestamps),
                 currentWindow,
                 PaneInfo.NO_FIRING));
 
         windowTimestamps.clear();
 
-        processContext.setWindowedValue(nextValue);
-
         currentWindow = nextWindow;
+        currentValue = nextValue;
         accumulator = nextValue.getValue().getValue();
         windowTimestamps.add(nextValue.getTimestamp());
       }
@@ -154,7 +148,8 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
     // emit the final accumulator
     out.collect(
         WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+                options, sideInputReader, currentValue.getWindows())),
             outputTimeFn.merge(currentWindow, windowTimestamps),
             currentWindow,
             PaneInfo.NO_FIRING));

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index aeeabbf..27ba5ac 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -17,24 +17,27 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import java.util.Collections;
 import java.util.Map;
-import org.apache.beam.runners.core.DoFnAdapters;
-import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
 /**
- * Encapsulates a {@link OldDoFn} that can emit to multiple
+ * Encapsulates a {@link DoFn} that can emit to multiple
  * outputs inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
  *
  * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
@@ -44,33 +47,30 @@ import org.apache.flink.util.Collector;
 public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<RawUnionValue>> {
 
-  private final OldDoFn<InputT, OutputT> doFn;
+  private final DoFn<InputT, OutputT> doFn;
   private final SerializedPipelineOptions serializedOptions;
 
   private final Map<TupleTag<?>, Integer> outputMap;
 
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  private final boolean requiresWindowAccess;
-  private final boolean hasSideInputs;
-
   private final WindowingStrategy<?, ?> windowingStrategy;
+  private TupleTag<OutputT> mainOutputTag;
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
 
   public FlinkMultiOutputDoFnFunction(
       DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
       PipelineOptions options,
-      Map<TupleTag<?>, Integer> outputMap) {
-    this.doFn = DoFnAdapters.toOldDoFn(doFn);
+      Map<TupleTag<?>, Integer> outputMap,
+      TupleTag<OutputT> mainOutputTag) {
+    this.doFn = doFn;
     this.serializedOptions = new SerializedPipelineOptions(options);
     this.outputMap = outputMap;
 
-    this.requiresWindowAccess =
-        DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow();
-    this.hasSideInputs = !sideInputs.isEmpty();
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
+    this.mainOutputTag = mainOutputTag;
   }
 
   @Override
@@ -78,49 +78,54 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
       Iterable<WindowedValue<InputT>> values,
       Collector<WindowedValue<RawUnionValue>> out) throws Exception {
 
-    FlinkMultiOutputProcessContext<InputT, OutputT> context =
-        new FlinkMultiOutputProcessContext<>(
-            serializedOptions.getPipelineOptions(),
-            getRuntimeContext(),
-            doFn,
-            windowingStrategy,
-            sideInputs, out,
-            outputMap
-        );
-
-    this.doFn.startBundle(context);
-
-    if (!requiresWindowAccess || hasSideInputs) {
-      // we don't need to explode the windows
-      for (WindowedValue<InputT> value : values) {
-        context.setWindowedValue(value);
-        doFn.processElement(context);
-      }
-    } else {
-      // we need to explode the windows because we have per-window
-      // side inputs and window access also only works if an element
-      // is in only one window
-      for (WindowedValue<InputT> value : values) {
-        for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
-          context.setWindowedValue(value);
-          doFn.processElement(context);
-        }
-      }
+    RuntimeContext runtimeContext = getRuntimeContext();
+
+    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
+        serializedOptions.getPipelineOptions(), doFn,
+        new FlinkSideInputReader(sideInputs, runtimeContext),
+        new DoFnOutputManager(out),
+        mainOutputTag,
+        // see SimpleDoFnRunner, just use it to limit number of side outputs
+        Collections.<TupleTag<?>>emptyList(),
+        new FlinkNoOpStepContext(),
+        new FlinkAggregatorFactory(runtimeContext),
+        windowingStrategy);
+
+    doFnRunner.startBundle();
+
+    for (WindowedValue<InputT> value : values) {
+      doFnRunner.processElement(value);
     }
 
-    // set the windowed value to null so that the special logic for outputting
-    // in startBundle/finishBundle kicks in
-    context.setWindowedValue(null);
-    this.doFn.finishBundle(context);
+    doFnRunner.finishBundle();
+
   }
 
   @Override
   public void open(Configuration parameters) throws Exception {
-    doFn.setup();
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
   }
 
   @Override
   public void close() throws Exception {
-    doFn.teardown();
+    doFnInvoker.invokeTeardown();
+  }
+
+  private class DoFnOutputManager
+      implements DoFnRunners.OutputManager {
+
+    private Collector<WindowedValue<RawUnionValue>> collector;
+
+    DoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector) {
+      this.collector = collector;
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), output.getValue()),
+          output.getTimestamp(), output.getWindows(), output.getPane()));
+    }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
deleted file mode 100644
index 7882b5f..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports side
- * outputs.
- */
-class FlinkMultiOutputProcessContext<InputT, OutputT>
-    extends FlinkProcessContextBase<InputT, OutputT> {
-
-  private final Collector<WindowedValue<RawUnionValue>> collector;
-  private final Map<TupleTag<?>, Integer> outputMap;
-
-  FlinkMultiOutputProcessContext(
-      PipelineOptions pipelineOptions,
-      RuntimeContext runtimeContext,
-      OldDoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      Collector<WindowedValue<RawUnionValue>> collector,
-      Map<TupleTag<?>, Integer> outputMap) {
-    super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs);
-    this.collector = collector;
-    this.outputMap = outputMap;
-  }
-
-  @Override
-  protected void outputWithTimestampAndWindow(
-      OutputT value,
-      Instant timestamp,
-      Collection<? extends BoundedWindow> windows,
-      PaneInfo pane) {
-    collector.collect(WindowedValue.of(new RawUnionValue(0, value), timestamp, windows, pane));
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <T> void sideOutput(TupleTag<T> tag, T value) {
-    if (windowedValue != null) {
-      sideOutputWithTimestamp(tag, value, windowedValue.getTimestamp());
-    } else {
-      sideOutputWithTimestamp(tag, value, null);
-    }
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
-    Integer index = outputMap.get(tag);
-
-    if (index == null) {
-      throw new IllegalArgumentException("Unknown side output tag: " + tag);
-    }
-
-    outputUnionValue(value, timestamp, new RawUnionValue(index, value));
-  }
-
-  private <T> void outputUnionValue(T value, Instant timestamp, RawUnionValue unionValue) {
-    if (windowedValue == null) {
-      // we are in startBundle() or finishBundle()
-
-      try {
-        Collection<? extends BoundedWindow> windows =
-            windowingStrategy
-                .getWindowFn()
-                .assignWindows(
-                    new FlinkNoElementAssignContext(
-                        windowingStrategy.getWindowFn(), value, timestamp));
-
-        collector.collect(
-            WindowedValue.of(
-                unionValue,
-                timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
-                windows,
-                PaneInfo.NO_FIRING));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      collector.collect(
-          WindowedValue.of(
-              unionValue,
-              windowedValue.getTimestamp(),
-              windowedValue.getWindows(),
-              windowedValue.getPane()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
deleted file mode 100644
index ad7255b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.joda.time.Instant;
-
-/**
- * {@link WindowFn.AssignContext} for calling a {@link WindowFn} for elements emitted from
- * {@link OldDoFn#startBundle(OldDoFn.Context)}
- * or {@link OldDoFn#finishBundle(OldDoFn.Context)}.
- *
- * <p>In those cases the {@code WindowFn} is not allowed to access any element information.
- */
-class FlinkNoElementAssignContext<InputT, W extends BoundedWindow>
-    extends WindowFn<InputT, W>.AssignContext {
-
-  private final InputT element;
-  private final Instant timestamp;
-
-  FlinkNoElementAssignContext(
-      WindowFn<InputT, W> fn,
-      InputT element,
-      Instant timestamp) {
-    fn.super();
-
-    this.element = element;
-    // the timestamp can be null, in that case output is called
-    // without a timestamp
-    this.timestamp = timestamp;
-  }
-
-  @Override
-  public InputT element() {
-    return element;
-  }
-
-  @Override
-  public Instant timestamp() {
-    if (timestamp != null) {
-      return timestamp;
-    } else {
-      throw new UnsupportedOperationException("No timestamp available.");
-    }
-  }
-
-  @Override
-  public BoundedWindow window() {
-    throw new UnsupportedOperationException("No window available.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
new file mode 100644
index 0000000..d901d8e
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * A {@link StepContext} for Flink Batch Runner execution.
+ */
+public class FlinkNoOpStepContext implements StepContext {
+
+  @Override
+  public String getStepName() {
+    return null;
+  }
+
+  @Override
+  public String getTransformName() {
+    return null;
+  }
+
+  @Override
+  public void noteOutput(WindowedValue<?> output) {
+
+  }
+
+  @Override
+  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+
+  }
+
+  @Override
+  public <T, W extends BoundedWindow> void writePCollectionViewData(
+      TupleTag<?> tag,
+      Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder,
+      W window,
+      Coder<W> windowCoder) throws IOException {
+  }
+
+  @Override
+  public StateInternals<?> stateInternals() {
+    return null;
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+    return null;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index 7db30d1..1d1ff9f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,9 +24,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -55,8 +54,6 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
 
   protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
 
-  protected final OldDoFn<KV<K, InputT>, KV<K, AccumT>> doFn;
-
   protected final WindowingStrategy<?, W> windowingStrategy;
 
   protected final SerializedPipelineOptions serializedOptions;
@@ -74,13 +71,6 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     this.sideInputs = sideInputs;
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
-    // dummy OldDoFn because we need one for ProcessContext
-    this.doFn = new OldDoFn<KV<K, InputT>, KV<K, AccumT>>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-
-      }
-    };
   }
 
   @Override
@@ -88,17 +78,13 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
 
-    FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
-        new FlinkSingleOutputProcessContext<>(
-            serializedOptions.getPipelineOptions(),
-            getRuntimeContext(),
-            doFn,
-            windowingStrategy,
-            sideInputs, out
-        );
+    PipelineOptions options = serializedOptions.getPipelineOptions();
+
+    FlinkSideInputReader sideInputReader =
+        new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
-    OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
-        OldPerKeyCombineFnRunners.create(combineFn);
+    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =
@@ -108,8 +94,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     // memory
     // this seems very unprudent, but correct, for now
     ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
-    for (WindowedValue<KV<K, InputT>> inputValue: elements) {
-      for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) {
+    for (WindowedValue<KV<K, InputT>> inputValue : elements) {
+      for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
         sortedInput.add(exploded);
       }
     }
@@ -132,9 +118,10 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     K key = currentValue.getValue().getKey();
     BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
     InputT firstValue = currentValue.getValue().getValue();
-    processContext.setWindowedValue(currentValue);
-    AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
-    accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
+    AccumT accumulator = combineFnRunner.createAccumulator(key,
+        options, sideInputReader, currentValue.getWindows());
+    accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
+        options, sideInputReader, currentValue.getWindows());
 
     // we use this to keep track of the timestamps assigned by the OutputTimeFn
     Instant windowTimestamp =
@@ -147,8 +134,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
       if (nextWindow.equals(currentWindow)) {
         // continue accumulating
         InputT value = nextValue.getValue().getValue();
-        processContext.setWindowedValue(nextValue);
-        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
+        accumulator = combineFnRunner.addInput(key, accumulator, value,
+            options, sideInputReader, currentValue.getWindows());
 
         windowTimestamp = outputTimeFn.combine(
             windowTimestamp,
@@ -164,10 +151,12 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
                 PaneInfo.NO_FIRING));
 
         currentWindow = nextWindow;
+        currentValue = nextValue;
         InputT value = nextValue.getValue().getValue();
-        processContext.setWindowedValue(nextValue);
-        accumulator = combineFnRunner.createAccumulator(key, processContext);
-        accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
+        accumulator = combineFnRunner.createAccumulator(key,
+            options, sideInputReader, currentValue.getWindows());
+        accumulator = combineFnRunner.addInput(key, accumulator, value,
+            options, sideInputReader, currentValue.getWindows());
         windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
deleted file mode 100644
index 9b83eb4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.WindowingInternals;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn.ProcessContext} for our Flink Wrappers.
- */
-abstract class FlinkProcessContextBase<InputT, OutputT>
-    extends OldDoFn<InputT, OutputT>.ProcessContext {
-
-  private final PipelineOptions pipelineOptions;
-  private final RuntimeContext runtimeContext;
-  private final boolean requiresWindowAccess;
-  protected final WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy;
-  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
-  protected WindowedValue<InputT> windowedValue;
-
-  FlinkProcessContextBase(
-      PipelineOptions pipelineOptions,
-      RuntimeContext runtimeContext,
-      OldDoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ? extends BoundedWindow>> sideInputs) {
-    doFn.super();
-    checkNotNull(pipelineOptions);
-    checkNotNull(runtimeContext);
-    checkNotNull(doFn);
-
-    this.pipelineOptions = pipelineOptions;
-    this.runtimeContext = runtimeContext;
-    this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
-    this.windowingStrategy = windowingStrategy;
-    this.sideInputs = sideInputs;
-
-    super.setupDelegateAggregators();
-  }
-
-  public void setWindowedValue(WindowedValue<InputT> windowedValue) {
-    this.windowedValue = windowedValue;
-  }
-
-  @Override
-  public InputT element() {
-    return this.windowedValue.getValue();
-  }
-
-
-  @Override
-  public Instant timestamp() {
-    return windowedValue.getTimestamp();
-  }
-
-  @Override
-  public BoundedWindow window() {
-    if (!requiresWindowAccess) {
-      throw new UnsupportedOperationException(
-          "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
-    }
-    return Iterables.getOnlyElement(windowedValue.getWindows());
-  }
-
-  @Override
-  public PaneInfo pane() {
-    return windowedValue.getPane();
-  }
-
-  @Override
-  public WindowingInternals<InputT, OutputT> windowingInternals() {
-
-    return new WindowingInternals<InputT, OutputT>() {
-
-      @Override
-      public StateInternals stateInternals() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void outputWindowedValue(
-          OutputT value,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo pane) {
-        outputWithTimestampAndWindow(value, timestamp, windows, pane);
-      }
-
-      @Override
-      public <SideOutputT> void sideOutputWindowedValue(
-          TupleTag<SideOutputT> tag,
-          SideOutputT output,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo pane) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Collection<? extends BoundedWindow> windows() {
-        return windowedValue.getWindows();
-      }
-
-      @Override
-      public PaneInfo pane() {
-        return windowedValue.getPane();
-      }
-
-      @Override
-      public <ViewT> ViewT sideInput(
-          PCollectionView<ViewT> view,
-          BoundedWindow sideInputWindow) {
-
-        checkNotNull(view, "View passed to sideInput cannot be null");
-        checkNotNull(
-            sideInputs.get(view),
-            "Side input for " + view + " not available.");
-
-        Map<BoundedWindow, ViewT> sideInputs =
-            runtimeContext.getBroadcastVariableWithInitializer(
-                view.getTagInternal().getId(), new SideInputInitializer<>(view));
-        return sideInputs.get(sideInputWindow);
-      }
-    };
-  }
-
-  @Override
-  public PipelineOptions getPipelineOptions() {
-    return pipelineOptions;
-  }
-
-  @Override
-  public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) {
-    checkNotNull(view, "View passed to sideInput cannot be null");
-    checkNotNull(sideInputs.get(view), "Side input for " + view + " not available.");
-    Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator();
-    BoundedWindow window;
-    if (!windowIter.hasNext()) {
-      throw new IllegalStateException(
-          "sideInput called when main input element is not in any windows");
-    } else {
-      window = windowIter.next();
-      if (windowIter.hasNext()) {
-        throw new IllegalStateException(
-            "sideInput called when main input element is in multiple windows");
-      }
-    }
-
-    // get the side input strategy for mapping the window
-    WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
-
-    BoundedWindow sideInputWindow =
-        windowingStrategy.getWindowFn().getSideInputWindow(window);
-
-    Map<BoundedWindow, ViewT> sideInputs =
-        runtimeContext.getBroadcastVariableWithInitializer(
-            view.getTagInternal().getId(), new SideInputInitializer<>(view));
-    ViewT result = sideInputs.get(sideInputWindow);
-    if (result == null) {
-      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
-    }
-    return result;
-  }
-
-  @Override
-  public void output(OutputT value) {
-    if (windowedValue != null) {
-      outputWithTimestamp(value, windowedValue.getTimestamp());
-    } else {
-      outputWithTimestamp(value, null);
-    }
-  }
-
-  @Override
-  public final void outputWithTimestamp(OutputT value, Instant timestamp) {
-    if (windowedValue == null) {
-      // we are in startBundle() or finishBundle()
-
-      try {
-        Collection windows = windowingStrategy.getWindowFn().assignWindows(
-            new FlinkNoElementAssignContext(
-                windowingStrategy.getWindowFn(),
-                value,
-                timestamp));
-
-        outputWithTimestampAndWindow(
-            value,
-            timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
-            windows,
-            PaneInfo.NO_FIRING);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      outputWithTimestampAndWindow(
-          value, timestamp, windowedValue.getWindows(), windowedValue.getPane());
-    }
-  }
-
-  protected abstract void outputWithTimestampAndWindow(
-      OutputT value,
-      Instant timestamp,
-      Collection<? extends BoundedWindow> windows,
-      PaneInfo pane);
-
-  @Override
-  public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
-  @Override
-  public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
-
-  @Override
-  public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
-  createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-    @SuppressWarnings("unchecked")
-    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =
-        (SerializableFnAggregatorWrapper<AggInputT, AggOutputT>)
-            runtimeContext.getAccumulator(name);
-
-    if (result == null) {
-      result = new SerializableFnAggregatorWrapper<>(combiner);
-      runtimeContext.addAccumulator(name, result);
-    }
-    return result;  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 81e37f4..3e4f742 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,9 +26,8 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
-import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
+import org.apache.beam.runners.core.PerKeyCombineFnRunner;
+import org.apache.beam.runners.core.PerKeyCombineFnRunners;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
@@ -57,8 +56,6 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
   protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
 
-  protected final OldDoFn<KV<K, AccumT>, KV<K, OutputT>> doFn;
-
   protected final WindowingStrategy<?, W> windowingStrategy;
 
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
@@ -78,13 +75,6 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
     this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
 
-    // dummy OldDoFn because we need one for ProcessContext
-    this.doFn = new OldDoFn<KV<K, AccumT>, KV<K, OutputT>>() {
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
-
-      }
-    };
   }
 
   @Override
@@ -92,17 +82,13 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
       Iterable<WindowedValue<KV<K, AccumT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
-        new FlinkSingleOutputProcessContext<>(
-            serializedOptions.getPipelineOptions(),
-            getRuntimeContext(),
-            doFn,
-            windowingStrategy,
-            sideInputs, out
-        );
+    PipelineOptions options = serializedOptions.getPipelineOptions();
+
+    FlinkSideInputReader sideInputReader =
+        new FlinkSideInputReader(sideInputs, getRuntimeContext());
 
-    OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
-        OldPerKeyCombineFnRunners.create(combineFn);
+    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
 
     @SuppressWarnings("unchecked")
     OutputTimeFn<? super BoundedWindow> outputTimeFn =
@@ -150,17 +136,17 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
       if (nextWindow.equals(currentWindow)) {
         // continue accumulating
-        processContext.setWindowedValue(nextValue);
         accumulator = combineFnRunner.mergeAccumulators(
-            key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext);
+            key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
+            options, sideInputReader, currentValue.getWindows());
 
         windowTimestamps.add(nextValue.getTimestamp());
       } else {
         // emit the value that we currently have
-        processContext.setWindowedValue(currentValue);
         out.collect(
             WindowedValue.of(
-                KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+                KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+                    options, sideInputReader, currentValue.getWindows())),
                 outputTimeFn.merge(currentWindow, windowTimestamps),
                 currentWindow,
                 PaneInfo.NO_FIRING));
@@ -168,23 +154,18 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
         windowTimestamps.clear();
 
         currentWindow = nextWindow;
+        currentValue = nextValue;
         accumulator = nextValue.getValue().getValue();
         windowTimestamps.add(nextValue.getTimestamp());
       }
 
-      // we have to keep track so that we can set the context to the right
-      // windowed value when windows change in the iterable
-      currentValue = nextValue;
     }
 
-    // if at the end of the iteration we have a change in windows
-    // the ProcessContext will not have been updated
-    processContext.setWindowedValue(currentValue);
-
     // emit the final accumulator
     out.collect(
         WindowedValue.of(
-            KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
+            KV.of(key, combineFnRunner.extractOutput(key, accumulator,
+                options, sideInputReader, currentValue.getWindows())),
             outputTimeFn.merge(currentWindow, windowTimestamps),
             currentWindow,
             PaneInfo.NO_FIRING));

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
new file mode 100644
index 0000000..c317182
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * A {@link SideInputReader} for the Flink Batch Runner.
+ */
+public class FlinkSideInputReader implements SideInputReader {
+
+  private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  private RuntimeContext runtimeContext;
+
+  public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
+                              RuntimeContext runtimeContext) {
+    sideInputs = new HashMap<>();
+    for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
+      sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
+    }
+    this.runtimeContext = runtimeContext;
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    checkNotNull(view, "View passed to sideInput cannot be null");
+    TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
+    checkNotNull(
+        sideInputs.get(tag),
+        "Side input for " + view + " not available.");
+
+    Map<BoundedWindow, T> sideInputs =
+        runtimeContext.getBroadcastVariableWithInitializer(
+            tag.getId(), new SideInputInitializer<>(view));
+    T result = sideInputs.get(window);
+    if (result == null) {
+      result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+    }
+    return result;
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return sideInputs.containsKey(view.getTagInternal());
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputs.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
deleted file mode 100644
index 0db7f5a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/** {@link OldDoFn.ProcessContext} for {@link FlinkDoFnFunction} with a single main output. */
-class FlinkSingleOutputProcessContext<InputT, OutputT>
-    extends FlinkProcessContextBase<InputT, OutputT> {
-
-  private final Collector<WindowedValue<OutputT>> collector;
-
-  FlinkSingleOutputProcessContext(
-      PipelineOptions pipelineOptions,
-      RuntimeContext runtimeContext,
-      OldDoFn<InputT, OutputT> doFn,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
-      Collector<WindowedValue<OutputT>> collector) {
-    super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs);
-    this.collector = collector;
-  }
-
-  @Override
-  protected void outputWithTimestampAndWindow(
-      OutputT value,
-      Instant timestamp,
-      Collection<? extends BoundedWindow> windows,
-      PaneInfo pane) {
-    collector.collect(WindowedValue.of(value, timestamp, windows, pane));
-  }
-
-  @Override
-  public <T> void sideOutput(TupleTag<T> tag, T value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
-    throw new UnsupportedOperationException();
-  }
-}


Mime
View raw message