nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-213] Use Beam's DoFnRunners to execute DoFn (#122)
Date Thu, 11 Oct 2018 01:37:11 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 18b61c1  [NEMO-213] Use Beam's DoFnRunners to execute DoFn (#122)
18b61c1 is described below

commit 18b61c1e862e3af51c87f19c0182725cdee8e2a0
Author: Taegeon Um <taegeonum@gmail.com>
AuthorDate: Thu Oct 11 10:37:06 2018 +0900

    [NEMO-213] Use Beam's DoFnRunners to execute DoFn (#122)
    
    JIRA: [NEMO-213: Use Beam's DoFnRunners to execute DoFn](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-213)
    
    **Major changes:**
    - use `simpleDoFnRunner` to process events of `DoFn`
    - convert all beam `Transform`s to process `WindowedValue`
    - create a `BeamBoundedWindowedSourceVertex` that generates `WindowedValue`s
    - modify `PipelineTranslator` to use the newly created `Transform`s (`SimpleDoFnTransform`...)
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    - create a unit test that executes `SimpleDoFnTransform`
    
    **Other comments:**
    - We need a performance test
    
    Closes #122
---
 .../compiler/frontend/beam/BeamKeyExtractor.java   |   7 +-
 .../compiler/frontend/beam/NemoPipelineRunner.java |   5 +-
 .../compiler/frontend/beam/PipelineTranslator.java | 128 ++++--
 .../beam/source/BeamBoundedSourceVertex.java       |  33 +-
 .../BroadcastVariableSideInputReader.java          |  60 +++
 .../beam/transform/CreateViewTransform.java        |  16 +-
 .../beam/transform/DefaultOutputManager.java       |  51 +++
 .../frontend/beam/transform/DoFnTransform.java     | 157 +++++++
 .../frontend/beam/transform/DoTransform.java       | 449 ---------------------
 .../beam/transform/GroupByKeyTransform.java        |  15 +-
 .../frontend/beam/transform/WindowFnTransform.java |  98 +++++
 .../frontend/beam/transform/WindowTransform.java   |  61 ---
 .../nemo/compiler/optimizer/PairKeyExtractor.java} |  17 +-
 .../compiletime/reshaping/SkewReshapingPass.java   |   3 +-
 .../compiler/backend/nemo/DAGConverterTest.java    |   5 +-
 .../frontend/beam/transform/DoFnTransformTest.java | 287 +++++++++++++
 .../beam/MultinomialLogisticRegression.java        |   6 +-
 .../runtime/master/scheduler/TaskRetryTest.java    |   4 +-
 18 files changed, 816 insertions(+), 586 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
index c3f4bbc..b64cb60 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
@@ -15,6 +15,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.KeyExtractor;
 import org.apache.beam.sdk.values.KV;
 
@@ -25,9 +26,11 @@ import org.apache.beam.sdk.values.KV;
 final class BeamKeyExtractor implements KeyExtractor {
   @Override
   public Object extractKey(final Object element) {
-    if (element instanceof KV) {
+    final WindowedValue windowedValue = (WindowedValue) element;
+    final Object value = windowedValue.getValue();
+    if (value instanceof KV) {
       // Handle null keys, since Beam allows KV with null keys.
-      final Object key = ((KV) element).getKey();
+      final Object key = ((KV) value).getKey();
       return key == null ? 0 : key;
     } else {
       return element;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
index e3043bd..deb4dfe 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
@@ -56,8 +56,9 @@ public final class NemoPipelineRunner extends PipelineRunner<NemoPipelineResult>
   public NemoPipelineResult run(final Pipeline pipeline) {
     final PipelineVisitor pipelineVisitor = new PipelineVisitor();
     pipeline.traverseTopologically(pipelineVisitor);
-    final DAG<IRVertex, IREdge> dag = PipelineTranslator.translate(pipelineVisitor.getConvertedPipeline(),
-        nemoPipelineOptions);
+    final DAG<IRVertex, IREdge> dag = PipelineTranslator.translate(pipeline,
+      pipelineVisitor.getConvertedPipeline(),
+      nemoPipelineOptions);
 
     final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
     JobLauncher.launchDAG(dag);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 6ef96bf..2486a00 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -15,6 +15,12 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.core.construction.TransformInputs;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.ir.edge.IREdge;
@@ -38,6 +44,7 @@ import org.apache.beam.sdk.values.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -45,17 +52,19 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
- * Converts DAG of Beam pipeline to Nemo IR DAG.
+ * Converts DAG of Beam root to Nemo IR DAG.
  * For a {@link PrimitiveTransformVertex}, it defines mapping to the corresponding {@link IRVertex}.
  * For a {@link CompositeTransformVertex}, it defines how to setup and clear {@link TranslationContext}
  * before start translating inner Beam transform hierarchy.
  */
 public final class PipelineTranslator
-    implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
+  implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
 
@@ -64,15 +73,21 @@ public final class PipelineTranslator
   private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
   private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap<>();
 
+  // TODO #220: Move this variable to TranslationContext
+  private static final AtomicReference<Pipeline> PIPELINE = new AtomicReference<>();
+
   /**
    * Static translator method.
-   * @param pipeline Top-level Beam transform hierarchy, usually given by {@link PipelineVisitor}
+   * @param pipeline the original root
+   * @param root Top-level Beam transform hierarchy, usually given by {@link PipelineVisitor}
    * @param pipelineOptions {@link PipelineOptions}
    * @return Nemo IR DAG
    */
-  public static DAG<IRVertex, IREdge> translate(final CompositeTransformVertex pipeline,
+  public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
+                                                final CompositeTransformVertex root,
                                                 final PipelineOptions pipelineOptions) {
-    return INSTANCE.apply(pipeline, pipelineOptions);
+    PIPELINE.set(pipeline);
+    return INSTANCE.apply(root, pipelineOptions);
   }
 
   /**
@@ -113,38 +128,74 @@ public final class PipelineTranslator
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
   }
 
+  private static DoFnTransform createDoFnTransform(final TranslationContext ctx,
+                                                   final PrimitiveTransformVertex transformVertex) {
+    try {
+      final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+      final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
+      final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
+      final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
+      final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
+
+      final PCollection<?> mainInput = (PCollection<?>)
+        Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+
+      return new DoFnTransform(
+        doFn,
+        mainInput.getCoder(),
+        getOutputCoders(pTransform),
+        mainOutputTag,
+        additionalOutputTags.getAll(),
+        mainInput.getWindowingStrategy(),
+        sideInputs,
+        ctx.pipelineOptions);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @PrimitiveTransformTranslator(ParDo.SingleOutput.class)
   private static void parDoSingleOutputTranslator(final TranslationContext ctx,
                                                   final PrimitiveTransformVertex transformVertex,
                                                   final ParDo.SingleOutput<?, ?> transform) {
-    final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions);
-    final IRVertex vertex = new OperatorVertex(doTransform);
+    final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+    final IRVertex vertex = new OperatorVertex(doFnTransform);
+
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().stream()
-        .filter(input -> !transform.getAdditionalInputs().values().contains(input))
-        .forEach(input -> ctx.addEdgeTo(vertex, input));
+      .filter(input -> !transform.getAdditionalInputs().values().contains(input))
+      .forEach(input -> ctx.addEdgeTo(vertex, input));
     transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
   }
 
+  private static Map<TupleTag<?>, Coder<?>> getOutputCoders(final AppliedPTransform<?, ?, ?> ptransform) {
+    return ptransform
+      .getOutputs()
+      .entrySet()
+      .stream()
+      .filter(e -> e.getValue() instanceof PCollection)
+      .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
+  }
+
   @PrimitiveTransformTranslator(ParDo.MultiOutput.class)
   private static void parDoMultiOutputTranslator(final TranslationContext ctx,
                                                  final PrimitiveTransformVertex transformVertex,
                                                  final ParDo.MultiOutput<?, ?> transform) {
-    final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions);
-    final IRVertex vertex = new OperatorVertex(doTransform);
+    final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+    final IRVertex vertex = new OperatorVertex(doFnTransform);
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().stream()
-        .filter(input -> !transform.getAdditionalInputs().values().contains(input))
-        .forEach(input -> ctx.addEdgeTo(vertex, input));
+      .filter(input -> !transform.getAdditionalInputs().values().contains(input))
+      .forEach(input -> ctx.addEdgeTo(vertex, input));
     transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
     transformVertex.getNode().getOutputs().entrySet().stream()
-        .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
-        .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(vertex, pValueWithTupleTag.getValue()));
+      .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
+      .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(vertex, pValueWithTupleTag.getValue()));
     transformVertex.getNode().getOutputs().entrySet().stream()
-        .filter(pValueWithTupleTag -> !pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
-        .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(vertex, pValueWithTupleTag.getValue(),
-            pValueWithTupleTag.getKey()));
+      .filter(pValueWithTupleTag -> !pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
+      .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(vertex, pValueWithTupleTag.getValue(),
+        pValueWithTupleTag.getKey()));
   }
 
   @PrimitiveTransformTranslator(GroupByKey.class)
@@ -169,7 +220,7 @@ public final class PipelineTranslator
     } else {
       throw new UnsupportedOperationException(String.format("%s is not supported", transform));
     }
-    final IRVertex vertex = new OperatorVertex(new WindowTransform(windowFn));
+    final IRVertex vertex = new OperatorVertex(new WindowFnTransform(windowFn));
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
@@ -225,7 +276,7 @@ public final class PipelineTranslator
     final boolean handlesBeamRow = Stream
       .concat(transformVertex.getNode().getInputs().values().stream(),
         transformVertex.getNode().getOutputs().values().stream())
-      .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
+      .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output of combine should be KV
       .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
       .anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
     if (handlesBeamRow) {
@@ -280,7 +331,8 @@ public final class PipelineTranslator
   }
 
   @Override
-  public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline, final PipelineOptions pipelineOptions) {
+  public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
+                                     final PipelineOptions pipelineOptions) {
     final TranslationContext ctx = new TranslationContext(pipeline, primitiveTransformToTranslator,
         compositeTransformToTranslator, DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
     ctx.translate(pipeline);
@@ -351,7 +403,7 @@ public final class PipelineTranslator
    * Translation context.
    */
   private static final class TranslationContext {
-    private final CompositeTransformVertex pipeline;
+    private final CompositeTransformVertex root;
     private final PipelineOptions pipelineOptions;
     private final DAGBuilder<IRVertex, IREdge> builder;
     private final Map<PValue, IRVertex> pValueToProducer;
@@ -363,18 +415,18 @@ public final class PipelineTranslator
     private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator;
 
     /**
-     * @param pipeline the pipeline to translate
+     * @param root the root to translate
      * @param primitiveTransformToTranslator provides translators for PrimitiveTransform
      * @param compositeTransformToTranslator provides translators for CompositeTransform
      * @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
      * @param pipelineOptions {@link PipelineOptions}
      */
-    private TranslationContext(final CompositeTransformVertex pipeline,
+    private TranslationContext(final CompositeTransformVertex root,
                                final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator,
                                final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector,
                                final PipelineOptions pipelineOptions) {
-      this.pipeline = pipeline;
+      this.root = root;
       this.builder = new DAGBuilder<>();
       this.pValueToProducer = new HashMap<>();
       this.pValueToTag = new HashMap<>();
@@ -393,7 +445,7 @@ public final class PipelineTranslator
      */
     private TranslationContext(final TranslationContext ctx,
                                final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector) {
-      this.pipeline = ctx.pipeline;
+      this.root = ctx.root;
       this.pipelineOptions = ctx.pipelineOptions;
       this.builder = ctx.builder;
       this.pValueToProducer = ctx.pValueToProducer;
@@ -465,7 +517,7 @@ public final class PipelineTranslator
       if (src == null) {
         try {
           throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
-              + "while PTransform %s is known to produce it.", input, pipeline.getPrimitiveProducerOf(input)));
+              + "while PTransform %s is known to produce it.", input, root.getPrimitiveProducerOf(input)));
         } catch (final RuntimeException e) {
           throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
               + "and the corresponding PTransform was not found", input));
@@ -477,17 +529,18 @@ public final class PipelineTranslator
             + "for an edge from %s to %s", communicationPatternSelector, src, dst));
       }
       final IREdge edge = new IREdge(communicationPattern, src, dst);
-      final Coder<?> coder;
+      final Coder coder;
+      final Coder windowCoder;
       if (input instanceof PCollection) {
         coder = ((PCollection) input).getCoder();
+        windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
       } else if (input instanceof PCollectionView) {
-        coder = getCoderForView((PCollectionView) input, pipeline);
+        coder = getCoderForView((PCollectionView) input, root);
+        windowCoder = ((PCollectionView) input).getPCollection()
+          .getWindowingStrategy().getWindowFn().windowCoder();
       } else {
-        coder = null;
-      }
-      if (coder == null) {
         throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
-            + "be determined", src, dst, input));
+          + "be determined", src, dst, input));
       }
 
       edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
@@ -497,8 +550,11 @@ public final class PipelineTranslator
         edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
         edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
       }
-      edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder)));
-      edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder)));
+
+      edge.setProperty(EncoderProperty.of(
+        new BeamEncoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
+      edge.setProperty(DecoderProperty.of(
+        new BeamDecoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
 
       if (pValueToTag.containsKey(input)) {
         edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
@@ -553,7 +609,7 @@ public final class PipelineTranslator
 
       final Transform srcTransform = src instanceof OperatorVertex ? ((OperatorVertex) src).getTransform() : null;
       final Transform dstTransform = dst instanceof OperatorVertex ? ((OperatorVertex) dst).getTransform() : null;
-      final DoFn srcDoFn = srcTransform instanceof DoTransform ? ((DoTransform) srcTransform).getDoFn() : null;
+      final DoFn srcDoFn = srcTransform instanceof DoFnTransform ? ((DoFnTransform) srcTransform).getDoFn() : null;
 
       if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn)) {
         return CommunicationPatternProperty.Value.Shuffle;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index f17ab98..09c9076 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -16,6 +16,7 @@
 package org.apache.nemo.compiler.frontend.beam.source;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.Readable;
 
 import java.io.IOException;
@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory;
  * SourceVertex implementation for BoundedSource.
  * @param <O> output type.
  */
-public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
+public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue<O>> {
   private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
   private BoundedSource<O> source;
   private final String sourceDescription;
@@ -68,8 +69,8 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
   }
 
   @Override
-  public List<Readable<O>> getReadables(final int desiredNumOfSplits) throws Exception {
-    final List<Readable<O>> readables = new ArrayList<>();
+  public List<Readable<WindowedValue<O>>> getReadables(final int desiredNumOfSplits) throws Exception {
+    final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
     LOG.info("estimate: {}", source.getEstimatedSizeBytes(null));
     LOG.info("desired: {}", desiredNumOfSplits);
     source.split(source.getEstimatedSizeBytes(null) / desiredNumOfSplits, null)
@@ -93,7 +94,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
    * BoundedSourceReadable class.
    * @param <T> type.
    */
-  private static final class BoundedSourceReadable<T> implements Readable<T> {
+  private static final class BoundedSourceReadable<T> implements Readable<WindowedValue<T>> {
     private final BoundedSource<T> boundedSource;
 
     /**
@@ -105,11 +106,29 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
     }
 
     @Override
-    public Iterable<T> read() throws IOException {
-      final ArrayList<T> elements = new ArrayList<>();
+    public Iterable<WindowedValue<T>> read() throws IOException {
+      boolean started = false;
+      boolean windowed = false;
+
+      final ArrayList<WindowedValue<T>> elements = new ArrayList<>();
       try (BoundedSource.BoundedReader<T> reader = boundedSource.createReader(null)) {
         for (boolean available = reader.start(); available; available = reader.advance()) {
-          elements.add(reader.getCurrent());
+          final T elem = reader.getCurrent();
+
+          // Check whether the element is windowed or not
+          // We only have to check the first element.
+          if (!started) {
+            started = true;
+            if (elem instanceof WindowedValue) {
+              windowed = true;
+            }
+          }
+
+          if (!windowed) {
+            elements.add(WindowedValue.valueInGlobalWindow(reader.getCurrent()));
+          } else {
+            elements.add((WindowedValue<T>) elem);
+          }
         }
       }
       return elements;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
new file mode 100644
index 0000000..4725fa9
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+/**
+ * A sideinput reader that reads/writes side input values to context.
+ */
+public final class BroadcastVariableSideInputReader implements SideInputReader {
+
+  // Nemo context for storing/getting side inputs
+  private final Transform.Context context;
+
+  // The list of side inputs that we're handling
+  private final Collection<PCollectionView<?>> sideInputs;
+
+  BroadcastVariableSideInputReader(final Transform.Context context,
+                                   final Collection<PCollectionView<?>> sideInputs) {
+    this.context = context;
+    this.sideInputs = sideInputs;
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+    // TODO #216: implement side input and windowing
+    return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue();
+  }
+
+  @Override
+  public <T> boolean contains(final PCollectionView<T> view) {
+    return sideInputs.contains(view);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sideInputs.isEmpty();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index cd93296..91fb3e2 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -15,6 +15,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.transforms.Materializations;
@@ -31,9 +32,9 @@ import java.util.ArrayList;
  * @param <I> input type.
  * @param <O> output type.
  */
-public final class CreateViewTransform<I, O> implements Transform<I, O> {
+public final class CreateViewTransform<I, O> implements Transform<WindowedValue<I>, WindowedValue<O>> {
   private final PCollectionView pCollectionView;
-  private OutputCollector<O> outputCollector;
+  private OutputCollector<WindowedValue<O>> outputCollector;
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
   private final MultiView<Object> multiView;
 
@@ -48,21 +49,22 @@ public final class CreateViewTransform<I, O> implements Transform<I, O> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> oc) {
+  public void prepare(final Context context, final OutputCollector<WindowedValue<O>> oc) {
     this.outputCollector = oc;
   }
 
   @Override
-  public void onData(final I element) {
-    // Since CreateViewTransform takes KV(Void, value), this is okay
-    final KV<?, ?> kv = (KV<?, ?>) element; // It will throw a type cast exception if the element is not KV
+  public void onData(final WindowedValue<I> element) {
+    // TODO #216: support window in view
+    final KV kv = ((WindowedValue<KV>) element).getValue();
     multiView.getDataList().add(kv.getValue());
   }
 
   @Override
   public void close() {
     final Object view = viewFn.apply(multiView);
-    outputCollector.emit((O) view);
+    // TODO #216: support window in view
+    outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view));
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
new file mode 100644
index 0000000..4174c6c
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+
+import java.util.Map;
+
+/**
+ * Default output emitter that uses outputCollector.
+ * @param <OutputT> output type
+ */
+public final class DefaultOutputManager<OutputT> implements DoFnRunners.OutputManager {
+  private final TupleTag<OutputT> mainOutputTag;
+  private final OutputCollector<WindowedValue<OutputT>> outputCollector;
+  private final Map<String, String> additionalOutputs;
+
+  DefaultOutputManager(final OutputCollector<WindowedValue<OutputT>> outputCollector,
+                       final Transform.Context context,
+                       final TupleTag<OutputT> mainOutputTag) {
+    this.outputCollector = outputCollector;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputs = context.getTagToAdditionalChildren();
+  }
+
+  @Override
+  public <T> void output(final TupleTag<T> tag, final WindowedValue<T> output) {
+    if (tag.equals(mainOutputTag)) {
+      outputCollector.emit((WindowedValue<OutputT>) output);
+    } else {
+      outputCollector.emit(additionalOutputs.get(tag.getId()), output);
+    }
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
new file mode 100644
index 0000000..8dbf051
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation.
+ *
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class DoFnTransform<InputT, OutputT> implements
+  Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
+
+  private OutputCollector<WindowedValue<OutputT>> outputCollector;
+  private final TupleTag<OutputT> mainOutputTag;
+  private final List<TupleTag<?>> additionalOutputTags;
+  private final Collection<PCollectionView<?>> sideInputs;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final DoFn<InputT, OutputT> doFn;
+  private final SerializablePipelineOptions serializedOptions;
+  private transient DoFnRunner<InputT, OutputT> doFnRunner;
+  private transient SideInputReader sideInputReader;
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+  private final Coder<InputT> inputCoder;
+  private final Map<TupleTag<?>, Coder<?>> outputCoders;
+
+  /**
+   * DoFnTransform Constructor.
+   *
+   * @param doFn    doFn.
+   * @param options Pipeline options.
+   */
+  public DoFnTransform(final DoFn<InputT, OutputT> doFn,
+                       final Coder<InputT> inputCoder,
+                       final Map<TupleTag<?>, Coder<?>> outputCoders,
+                       final TupleTag<OutputT> mainOutputTag,
+                       final List<TupleTag<?>> additionalOutputTags,
+                       final WindowingStrategy<?, ?> windowingStrategy,
+                       final Collection<PCollectionView<?>> sideInputs,
+                       final PipelineOptions options) {
+    this.doFn = doFn;
+    this.inputCoder = inputCoder;
+    this.outputCoders = outputCoders;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputTags = additionalOutputTags;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializablePipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
+    // deserialize pipeline option
+    final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
+
+    this.outputCollector = oc;
+
+    // create output manager
+    final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
+      outputCollector, context, mainOutputTag);
+
+    // create side input reader
+    if (!sideInputs.isEmpty()) {
+      sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
+    } else {
+      sideInputReader = NullSideInputReader.of(sideInputs);
+    }
+
+    // create step context
+    // this transform does not support state and timer.
+    final StepContext stepContext = new StepContext() {
+      @Override
+      public StateInternals stateInternals() {
+        throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
+      }
+
+      @Override
+      public TimerInternals timerInternals() {
+        throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
+      }
+    };
+
+    // invoker
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+
+    // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
+    // and that this approach is the standard used by most of the Beam runners
+    doFnRunner = DoFnRunners.simpleRunner(
+      options,
+      doFn,
+      sideInputReader,
+      outputManager,
+      mainOutputTag,
+      additionalOutputTags,
+      stepContext,
+      inputCoder,
+      outputCoders,
+      windowingStrategy);
+
+    doFnRunner.startBundle();
+  }
+
+  @Override
+  public void onData(final WindowedValue<InputT> data) {
+    doFnRunner.processElement(data);
+  }
+
+  public DoFn getDoFn() {
+    return doFn;
+  }
+
+  @Override
+  public void close() {
+    doFnRunner.finishBundle();
+    doFnInvoker.invokeTeardown();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("DoTransform:" + doFn);
+    return sb.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
deleted file mode 100644
index 3a36ec0..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * DoFn transform implementation.
- *
- * @param <I> input type.
- * @param <O> output type.
- */
-public final class DoTransform<I, O> implements Transform<I, O> {
-  private final DoFn doFn;
-  private final ObjectMapper mapper;
-  private final String serializedOptions;
-  private OutputCollector<O> outputCollector;
-  private StartBundleContext startBundleContext;
-  private FinishBundleContext finishBundleContext;
-  private ProcessContext processContext;
-  private DoFnInvoker invoker;
-
-  /**
-   * DoTransform Constructor.
-   *
-   * @param doFn    doFn.
-   * @param options Pipeline options.
-   */
-  public DoTransform(final DoFn doFn, final PipelineOptions options) {
-    this.doFn = doFn;
-    this.mapper = new ObjectMapper();
-    try {
-      this.serializedOptions = mapper.writeValueAsString(options);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void prepare(final Context context, final OutputCollector<O> oc) {
-    this.outputCollector = oc;
-    this.startBundleContext = new StartBundleContext(doFn, serializedOptions);
-    this.finishBundleContext = new FinishBundleContext(doFn, outputCollector, serializedOptions);
-    this.processContext = new ProcessContext(doFn, outputCollector, context, serializedOptions);
-    this.invoker = DoFnInvokers.invokerFor(doFn);
-    invoker.invokeSetup();
-    invoker.invokeStartBundle(startBundleContext);
-  }
-
-  @Override
-  public void onData(final I data) {
-    processContext.setElement(data);
-    invoker.invokeProcessElement(processContext);
-  }
-
-  @Override
-  public void close() {
-    invoker.invokeFinishBundle(finishBundleContext);
-    invoker.invokeTeardown();
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("DoTransform:" + doFn);
-    return sb.toString();
-  }
-
-  /**
-   * StartBundleContext.
-   *
-   * @param <I> input type.
-   * @param <O> output type.
-   */
-  private static final class StartBundleContext<I, O> extends DoFn<I, O>.StartBundleContext {
-    private final ObjectMapper mapper;
-    private final PipelineOptions options;
-
-    /**
-     * StartBundleContext.
-     *
-     * @param fn                DoFn.
-     * @param serializedOptions serialized options of the DoTransform.
-     */
-    StartBundleContext(final DoFn<I, O> fn,
-                       final String serializedOptions) {
-      fn.super();
-      this.mapper = new ObjectMapper();
-      try {
-        this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-  }
-
-  /**
-   * FinishBundleContext.
-   *
-   * @param <I> input type.
-   * @param <O> output type.
-   */
-  private static final class FinishBundleContext<I, O> extends DoFn<I, O>.FinishBundleContext {
-    private final OutputCollector<O> outputCollector;
-    private final ObjectMapper mapper;
-    private final PipelineOptions options;
-
-    /**
-     * Constructor.
-     *
-     * @param fn                DoFn.
-     * @param outputCollector   outputCollector of the DoTransform.
-     * @param serializedOptions serialized options of the DoTransform.
-     */
-    FinishBundleContext(final DoFn<I, O> fn,
-                        final OutputCollector<O> outputCollector,
-                        final String serializedOptions) {
-      fn.super();
-      this.outputCollector = outputCollector;
-      this.mapper = new ObjectMapper();
-      try {
-        this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public void output(final O output, final Instant instant, final BoundedWindow boundedWindow) {
-      outputCollector.emit(output);
-    }
-
-    @Override
-    public <T> void output(final TupleTag<T> tupleTag,
-                           final T t,
-                           final Instant instant,
-                           final BoundedWindow boundedWindow) {
-      throw new UnsupportedOperationException("output(TupleTag, T, Instant, BoundedWindow)"
-          + "in FinishBundleContext under DoTransform");
-    }
-  }
-
-  /**
-   * ProcessContext class. Reference: SimpleDoFnRunner.DoFnProcessContext in BEAM.
-   *
-   * @param <I> input type.
-   * @param <O> output type.
-   */
-  private static final class ProcessContext<I, O> extends DoFn<I, O>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<I, O> {
-    private I input;
-    private final OutputCollector<O> outputCollector;
-    private final Map<String, String> additionalOutputs;
-    private final Context context;
-    private final ObjectMapper mapper;
-    private final PipelineOptions options;
-
-    /**
-     * ProcessContext Constructor.
-     *
-     * @param fn                 Dofn.
-     * @param outputCollector    OutputCollector.
-     * @param context            Context.
-     * @param serializedOptions  Options, serialized.
-     */
-    ProcessContext(final DoFn<I, O> fn,
-                   final OutputCollector<O> outputCollector,
-                   final Context context,
-                   final String serializedOptions) {
-      fn.super();
-      this.outputCollector = outputCollector;
-      this.context = context;
-      this.additionalOutputs = context.getTagToAdditionalChildren();
-      this.mapper = new ObjectMapper();
-      try {
-        this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    /**
-     * Setter for input element.
-     *
-     * @param in input element.
-     */
-    void setElement(final I in) {
-      this.input = in;
-    }
-
-    @Override
-    public I element() {
-      return this.input;
-    }
-
-    @Override
-    public Row asRow(final String id) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T> T sideInput(final PCollectionView<T> view) {
-      return (T) context.getBroadcastVariable(view);
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException("timestamp() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
-    }
-
-    @Override
-    public void updateWatermark(final Instant instant) {
-      throw new UnsupportedOperationException("updateWatermark() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return this.options;
-    }
-
-    @Override
-    public void output(final O output) {
-      outputCollector.emit(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(final O output, final Instant timestamp) {
-      outputCollector.emit(output);
-    }
-
-    @Override
-    public <T> void output(final TupleTag<T> tupleTag, final T t) {
-      final Object dstVertexId = additionalOutputs.get(tupleTag.getId());
-
-      if (dstVertexId == null) {
-        outputCollector.emit((O) t);
-      } else {
-        outputCollector.emit(additionalOutputs.get(tupleTag.getId()), t);
-      }
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(final TupleTag<T> tupleTag, final T t, final Instant instant) {
-      throw new UnsupportedOperationException("output(TupleTag, T, Instant) in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      // Unbounded windows are not supported for now.
-      return GlobalWindow.INSTANCE;
-    }
-
-    @Override
-    public PaneInfo paneInfo(final DoFn<I, O> doFn) {
-      return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
-    }
-
-    @Override
-    public PipelineOptions pipelineOptions() {
-      return options;
-    }
-
-    @Override
-    public DoFn<I, O>.StartBundleContext startBundleContext(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
-    }
-
-    @Override
-    public DoFn<I, O>.FinishBundleContext finishBundleContext(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("FinishBundleContext parameters are not supported.");
-    }
-
-    @Override
-    public DoFn.ProcessContext
-    processContext(final DoFn<I, O> doFn) {
-      return this;
-    }
-
-    @Override
-    public DoFn.OnTimerContext
-    onTimerContext(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("onTimerContext() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public I element(final DoFn<I, O> doFn) {
-      return this.input;
-    }
-
-    @Override
-    public Instant timestamp(final DoFn<I, O> doFn) {
-      return Instant.now();
-    }
-
-    @Override
-    public RestrictionTracker<?, ?> restrictionTracker() {
-      throw new UnsupportedOperationException("restrictionTracker() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public TimeDomain timeDomain(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException("timeDomain() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public DoFn.OutputReceiver<O> outputReceiver(final DoFn<I, O> doFn) {
-      return new OutputReceiver<>((OutputCollectorImpl) outputCollector);
-    }
-
-    @Override
-    public DoFn.OutputReceiver<Row> outputRowReceiver(final DoFn<I, O> doFn) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public DoFn.MultiOutputReceiver taggedOutputReceiver(final DoFn<I, O> doFn) {
-      return new MultiOutputReceiver((OutputCollectorImpl) outputCollector, additionalOutputs);
-    }
-
-    @Override
-    public State state(final String stateId) {
-      throw new UnsupportedOperationException("state() in ProcessContext under DoTransform");
-    }
-
-    @Override
-    public Timer timer(final String timerId) {
-      throw new UnsupportedOperationException("timer() in ProcessContext under DoTransform");
-    }
-  }
-
-  /**
-   * @return {@link DoFn} for this transform.
-   */
-  public DoFn getDoFn() {
-    return doFn;
-  }
-
-  /**
-   * OutputReceiver class.
-   * @param <O> output type
-   */
-  static final class OutputReceiver<O> implements DoFn.OutputReceiver<O> {
-    private final List<O> dataElements;
-
-    OutputReceiver(final OutputCollectorImpl<O> outputCollector) {
-      this.dataElements = outputCollector.getMainTagOutputQueue();
-    }
-
-    OutputReceiver(final OutputCollectorImpl outputCollector,
-                   final TupleTag<O> tupleTag,
-                   final Map<String, String> tagToVertex) {
-      final Object dstVertexId = tagToVertex.get(tupleTag.getId());
-      if (dstVertexId == null) {
-        this.dataElements = outputCollector.getMainTagOutputQueue();
-      } else {
-        this.dataElements = (List<O>) outputCollector.getAdditionalTagOutputQueue((String) dstVertexId);
-      }
-    }
-
-    @Override
-    public void output(final O output) {
-      dataElements.add(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(final O output, final Instant timestamp) {
-      dataElements.add(output);
-    }
-  }
-
-  /**
-   * MultiOutputReceiver class.
-   */
-  static final class MultiOutputReceiver implements DoFn.MultiOutputReceiver {
-    private final OutputCollectorImpl outputCollector;
-    private final Map<String, String> tagToVertex;
-
-    /**
-     * Constructor.
-     * @param outputCollector outputCollector
-     * @param tagToVertex     tag to vertex map
-     */
-    MultiOutputReceiver(final OutputCollectorImpl outputCollector,
-                               final Map<String, String> tagToVertex) {
-      this.outputCollector = outputCollector;
-      this.tagToVertex = tagToVertex;
-    }
-
-    @Override
-    public <T> DoFn.OutputReceiver<T> get(final TupleTag<T> tag) {
-      return new OutputReceiver<>(this.outputCollector, tag, tagToVertex);
-    }
-
-    @Override
-    public <T> OutputReceiver<Row> getRowReceiver(final TupleTag<T> tag) {
-      throw new UnsupportedOperationException();
-    }
-  }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index b0e6429..38b2641 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -15,6 +15,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.values.KV;
@@ -27,10 +28,10 @@ import java.util.*;
  * Group Beam KVs.
  * @param <I> input type.
  */
-public final class GroupByKeyTransform<I> implements Transform<I, KV<Object, List>> {
+public final class GroupByKeyTransform<I> implements Transform<I, WindowedValue<KV<Object, List>>> {
   private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyTransform.class.getName());
   private final Map<Object, List> keyToValues;
-  private OutputCollector<KV<Object, List>> outputCollector;
+  private OutputCollector<WindowedValue<KV<Object, List>>> outputCollector;
 
   /**
    * GroupByKey constructor.
@@ -40,23 +41,27 @@ public final class GroupByKeyTransform<I> implements Transform<I, KV<Object, Lis
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<KV<Object, List>> oc) {
+  public void prepare(final Context context, final OutputCollector<WindowedValue<KV<Object, List>>> oc) {
     this.outputCollector = oc;
   }
 
   @Override
   public void onData(final I element) {
-    final KV kv = (KV) element;
+    // TODO #129: support window in group by key for windowed groupByKey
+    final WindowedValue<KV> windowedValue = (WindowedValue<KV>) element;
+    final KV kv = windowedValue.getValue();
     keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
     keyToValues.get(kv.getKey()).add(kv.getValue());
   }
 
   @Override
   public void close() {
+    // TODO #129: support window in group by key for windowed groupByKey
     if (keyToValues.isEmpty()) {
       LOG.warn("Beam GroupByKeyTransform received no data!");
     } else {
-      keyToValues.entrySet().stream().map(entry -> KV.of(entry.getKey(), entry.getValue()))
+      keyToValues.entrySet().stream().map(entry ->
+        WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(), entry.getValue())))
           .forEach(outputCollector::emit);
       keyToValues.clear();
     }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
new file mode 100644
index 0000000..e7c9135
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * Windowing transform implementation.
+ * This transform simply windows the given elements into
+ * finite windows according to a user-specified WindowFnTransform.
+ * @param <T> input/output type.
+ * @param <W> window type
+ */
+public final class WindowFnTransform<T, W extends BoundedWindow>
+  implements Transform<WindowedValue<T>, WindowedValue<T>> {
+  private final WindowFn windowFn;
+  private OutputCollector<WindowedValue<T>> outputCollector;
+
+  /**
+   * Default Constructor.
+   * @param windowFn windowFn for the Transform.
+   */
+  public WindowFnTransform(final WindowFn windowFn) {
+    this.windowFn = windowFn;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<WindowedValue<T>> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final WindowedValue<T> windowedValue) {
+    final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
+    final T element = windowedValue.getValue();
+    final Instant timestamp = windowedValue.getTimestamp();
+
+    try {
+      final Collection<W> windows =
+        ((WindowFn<T, W>) windowFn)
+          .assignWindows(
+            ((WindowFn<T, W>) windowFn).new AssignContext() {
+              @Override
+              public T element() {
+                return element;
+              }
+
+              @Override
+              public Instant timestamp() {
+                return timestamp;
+              }
+
+              @Override
+              public BoundedWindow window() {
+                return boundedWindow;
+              }
+            });
+
+      // Emit compressed windows for efficiency
+      outputCollector.emit(WindowedValue.of(element, timestamp, windows, PaneInfo.NO_FIRING));
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("WindowFnTransform:" + windowFn);
+    return sb.toString();
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java
deleted file mode 100644
index 20bc723..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-
-/**
- * Windowing transform implementation.
- * This transform simply windows the given elements into finite windows according to a user-specified WindowTransform.
- * As this functionality is unnecessary for batch processing workloads and for Runtime, this is left as below.
- * @param <T> input/output type.
- */
-public final class WindowTransform<T> implements Transform<T, T> {
-  private final WindowFn windowFn;
-  private OutputCollector<T> outputCollector;
-
-  /**
-   * Default Constructor.
-   * @param windowFn windowFn for the Transform.
-   */
-  public WindowTransform(final WindowFn windowFn) {
-    this.windowFn = windowFn;
-  }
-
-  @Override
-  public void prepare(final Context context, final OutputCollector<T> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
-  public void onData(final T element) {
-    // TODO #1: Support Beam Streaming in Compiler.
-    outputCollector.emit(element);
-  }
-
-  @Override
-  public void close() {
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("WindowTransform:" + windowFn);
-    return sb.toString();
-  }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
similarity index 62%
copy from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
copy to compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
index c3f4bbc..909af4b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
@@ -13,24 +13,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.compiler.optimizer;
 
 import org.apache.nemo.common.KeyExtractor;
-import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.Pair;
 
 /**
- * Extracts the key from a KV element.
- * For non-KV elements, the elements themselves become the key.
+ * Extracts the key from a pair element.
  */
-final class BeamKeyExtractor implements KeyExtractor {
+public final class PairKeyExtractor implements KeyExtractor {
   @Override
   public Object extractKey(final Object element) {
-    if (element instanceof KV) {
-      // Handle null keys, since Beam allows KV with null keys.
-      final Object key = ((KV) element).getKey();
-      return key == null ? 0 : key;
+    if (element instanceof Pair) {
+      return ((Pair) element).left();
     } else {
-      return element;
+      throw new IllegalStateException(element.toString());
     }
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index adf573b..e4743b5 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -30,6 +30,7 @@ import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import org.apache.nemo.compiler.optimizer.PairKeyExtractor;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ public final class SkewReshapingPass extends ReshapingPass {
     newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
-    newEdge.setProperty(KeyExtractorProperty.of(edge.getPropertyValue(KeyExtractorProperty.class).get()));
+    newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
     newEdge.setProperty(AdditionalOutputTagProperty.of("DynOptData"));
 
     // Dynamic optimization handles statistics on key-value data by default.
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
index 2f87188..0770c00 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -27,7 +27,6 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.compiler.frontend.beam.transform.DoTransform;
 import org.apache.nemo.common.test.EmptyComponents;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
@@ -66,7 +65,7 @@ public final class DAGConverterTest {
     v1.setProperty(ParallelismProperty.of(3));
     irDAGBuilder.addVertex(v1);
 
-    final IRVertex v2 = new OperatorVertex(new DoTransform(null, null));
+    final IRVertex v2 = new OperatorVertex(mock(Transform.class));
     v2.setProperty(ParallelismProperty.of(2));
     irDAGBuilder.addVertex(v2);
 
@@ -113,7 +112,7 @@ public final class DAGConverterTest {
     v1.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.COMPUTE));
 
     final Transform t = mock(Transform.class);
-    final DoTransform dt = new DoTransform(null, null);
+    final Transform dt = mock(Transform.class);
     final IRVertex v2 = new OperatorVertex(t);
     v2.setProperty(ParallelismProperty.of(3));
     v2.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.COMPUTE));
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
new file mode 100644
index 0000000..a30ee46
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.reef.io.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class DoFnTransformTest {
+
+  // views and windows for testing side inputs
+  private PCollectionView<Iterable<String>> view1;
+  private PCollectionView<Iterable<String>> view2;
+
+  private final static Coder NULL_INPUT_CODER = null;
+  private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+  @Before
+  public void setUp() {
+    Pipeline.create().apply(Create.of("1"));
+    view1 = Pipeline.create().apply(Create.of("1")).apply(View.asIterable());
+    view2 = Pipeline.create().apply(Create.of("2")).apply(View.asIterable());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSingleOutput() {
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new IdentityDoFn<>(),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("Hello"));
+
+    assertEquals(((TestOutputCollector<String>) oc).outputs.get(0), WindowedValue.valueInGlobalWindow("Hello"));
+
+    doFnTransform.close();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testMultiOutputOutput() {
+
+    TupleTag<String> mainOutput = new TupleTag<>("main-output");
+    TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
+    TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
+
+    ImmutableList<TupleTag<?>> tags = ImmutableList.of(additionalOutput1, additionalOutput2);
+
+    ImmutableMap<String, String> tagsMap =
+      ImmutableMap.<String, String>builder()
+        .put(additionalOutput1.getId(), additionalOutput1.getId())
+        .put(additionalOutput2.getId(), additionalOutput2.getId())
+        .build();
+
+    final DoFnTransform<String, String> doFnTransform =
+      new DoFnTransform<>(
+        new MultiOutputDoFn(additionalOutput1, additionalOutput2),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        mainOutput,
+        tags,
+        WindowingStrategy.globalDefault(),
+        emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+    // mock context
+    final Transform.Context context = mock(Transform.Context.class);
+    when(context.getTagToAdditionalChildren()).thenReturn(tagsMap);
+
+    final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("one"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("two"));
+    doFnTransform.onData(WindowedValue.valueInGlobalWindow("hello"));
+
+    // main output
+    assertEquals(WindowedValue.valueInGlobalWindow("got: hello"),
+      ((TestOutputCollector<String>) oc).outputs.get(0));
+
+    // additional output 1
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput1.getId(), WindowedValue.valueInGlobalWindow("extra: one"))
+    ));
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput1.getId(), WindowedValue.valueInGlobalWindow("got: hello"))
+    ));
+
+    // additional output 2
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput2.getId(), WindowedValue.valueInGlobalWindow("extra: two"))
+    ));
+    assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+      new Tuple<>(additionalOutput2.getId(), WindowedValue.valueInGlobalWindow("got: hello"))
+    ));
+
+    doFnTransform.close();
+  }
+
+
+  // TODO #216: implement side input and windowing
+  @Test
+  public void testSideInputs() {
+    // mock context
+    final Transform.Context context = mock(Transform.Context.class);
+    when(context.getBroadcastVariable(view1)).thenReturn(
+      WindowedValue.valueInGlobalWindow(ImmutableList.of("1")));
+    when(context.getBroadcastVariable(view2)).thenReturn(
+      WindowedValue.valueInGlobalWindow(ImmutableList.of("2")));
+
+    TupleTag<Tuple<String, Iterable<String>>> outputTag = new TupleTag<>("main-output");
+
+    WindowedValue<String> first = WindowedValue.valueInGlobalWindow("first");
+    WindowedValue<String> second = WindowedValue.valueInGlobalWindow("second");
+
+    final Map<String, PCollectionView<Iterable<String>>> eventAndViewMap =
+      ImmutableMap.of(first.getValue(), view1, second.getValue(), view2);
+
+    final DoFnTransform<String, Tuple<String, Iterable<String>>> doFnTransform =
+      new DoFnTransform<>(
+        new SimpleSideInputDoFn<>(eventAndViewMap),
+        NULL_INPUT_CODER,
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        Collections.emptyList(),
+        WindowingStrategy.globalDefault(),
+        ImmutableList.of(view1, view2), /* side inputs */
+        PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+    final OutputCollector<WindowedValue<Tuple<String, Iterable<String>>>> oc = new TestOutputCollector<>();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(first);
+    doFnTransform.onData(second);
+
+    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("first", ImmutableList.of("1"))),
+      ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(0));
+
+    assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("second", ImmutableList.of("2"))),
+      ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(1));
+
+    doFnTransform.close();
+  }
+
+  private static final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
+    private final List<WindowedValue<T>> outputs;
+    private final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
+
+    TestOutputCollector() {
+      this.outputs = new LinkedList<>();
+      this.taggedOutputs = new LinkedList<>();
+    }
+
+    @Override
+    public void emit(WindowedValue<T> output) {
+      outputs.add(output);
+    }
+
+    @Override
+    public <O> void emit(String dstVertexId, O output) {
+      final WindowedValue<T> val = (WindowedValue<T>) output;
+      final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
+      taggedOutputs.add(tuple);
+    }
+
+    public List<WindowedValue<T>> getOutput() {
+      return outputs;
+    }
+
+    public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
+      return taggedOutputs;
+    }
+  }
+
+  /**
+   * Identitiy do fn.
+   * @param <T> type
+   */
+  private static class IdentityDoFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(final ProcessContext c) throws Exception {
+      c.output(c.element());
+    }
+  }
+
+  /**
+   * Side input do fn.
+   * @param <T> type
+   */
+  private static class SimpleSideInputDoFn<T, V> extends DoFn<T, Tuple<T, V>> {
+    private final Map<T, PCollectionView<V>> idAndViewMap;
+
+    public SimpleSideInputDoFn(final Map<T, PCollectionView<V>> idAndViewMap) {
+      this.idAndViewMap = idAndViewMap;
+    }
+
+    @ProcessElement
+    public void processElement(final ProcessContext c) throws Exception {
+      final PCollectionView<V> view = idAndViewMap.get(c.element());
+      final V sideInput = c.sideInput(view);
+      c.output(new Tuple<>(c.element(), sideInput));
+    }
+  }
+
+
+  /**
+   * Multi output do fn.
+   */
+  private static class MultiOutputDoFn extends DoFn<String, String> {
+    private TupleTag<String> additionalOutput1;
+    private TupleTag<String> additionalOutput2;
+
+    public MultiOutputDoFn(TupleTag<String> additionalOutput1, TupleTag<String> additionalOutput2) {
+      this.additionalOutput1 = additionalOutput1;
+      this.additionalOutput2 = additionalOutput2;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      if ("one".equals(c.element())) {
+        c.output(additionalOutput1, "extra: one");
+      } else if ("two".equals(c.element())) {
+        c.output(additionalOutput2, "extra: two");
+      } else {
+        c.output("got: " + c.element());
+        c.output(additionalOutput1, "got: " + c.element());
+        c.output(additionalOutput2, "got: " + c.element());
+      }
+    }
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 01d9efe..b2315da 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -15,6 +15,8 @@
  */
 package org.apache.nemo.examples.beam;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.apache.nemo.common.Pair;
@@ -225,7 +227,9 @@ public final class MultinomialLogisticRegression {
     @FinishBundle
     public void finishBundle(final FinishBundleContext c) {
       for (Integer i = 0; i < gradients.size(); i++) {
-        c.output(KV.of(i, gradients.get(i)), null, null);
+        // this enforces a global window (batching),
+        // where all data elements of the corresponding PCollection are grouped and emitted downstream together
+        c.output(KV.of(i, gradients.get(i)), BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
       }
       LOG.info("stats: " + gradients.get(numClasses - 1).toString());
     }
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index b8a2dcc..6081a95 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -93,7 +93,7 @@ public final class TaskRetryTest {
     runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, injector);
   }
 
-  @Test(timeout=7000)
+  @Test(timeout=60000)
   public void testExecutorRemoved() throws Exception {
     // Until the plan finishes, events happen
     while (!planStateManager.isPlanDone()) {
@@ -119,7 +119,7 @@ public final class TaskRetryTest {
     assertTrue(planStateManager.isPlanDone());
   }
 
-  @Test(timeout=7000)
+  @Test(timeout=60000)
   public void testTaskOutputWriteFailure() throws Exception {
     // Three executors are used
     executorAdded(1.0);


Mime
View raw message