beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [11/36] beam git commit: mr-runner: refactors and creates Graph data structures to handle general Beam pipelines.
Date Thu, 07 Sep 2017 18:39:20 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 576c6bf..c336a70 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -1,27 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -33,22 +42,25 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 
 /**
- * Created by peihe on 24/07/2017.
+ * Class that translates a {@link Graphs.FusedStep} to a MapReduce job.
  */
 public class JobPrototype {
 
-  public static JobPrototype create(int stageId, Graph.Vertex vertex) {
-    return new JobPrototype(stageId, vertex);
+  public static JobPrototype create(
+      int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) {
+    return new JobPrototype(stageId, fusedStep, options);
   }
 
   private final int stageId;
-  private final Graph.Vertex vertex;
+  private final Graphs.FusedStep fusedStep;
   private final Set<JobPrototype> dependencies;
+  private final PipelineOptions options;
 
-  private JobPrototype(int stageId, Graph.Vertex vertex) {
+  private JobPrototype(int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) {
     this.stageId = stageId;
-    this.vertex = checkNotNull(vertex, "vertex");
+    this.fusedStep = checkNotNull(fusedStep, "fusedStep");
     this.dependencies = Sets.newHashSet();
+    this.options = checkNotNull(options, "options");
   }
 
   public Job build(Class<?> jarClass, Configuration conf) throws IOException {
@@ -57,168 +69,101 @@ public class JobPrototype {
     job.setJarByClass(jarClass);
     conf.set(
         "io.serializations",
-        "org.apache.hadoop.io.serializer.WritableSerialization," +
-            "org.apache.hadoop.io.serializer.JavaSerialization");
+        "org.apache.hadoop.io.serializer.WritableSerialization,"
+            + "org.apache.hadoop.io.serializer.JavaSerialization");
 
     // Setup BoundedSources in BeamInputFormat.
-    // TODO: support more than one in-edge
-    Graph.Edge inEdge = Iterables.getOnlyElement(vertex.getIncoming());
-    Graph.Vertex head = inEdge.getHead();
-    Graph.Step headStep = head.getStep();
-    checkState(headStep.getTransform() instanceof Read.Bounded);
-    Read.Bounded read = (Read.Bounded) headStep.getTransform();
+    // TODO: support more than one read steps by introducing a composed BeamInputFormat
+    // and a partition operation.
+    Graphs.Step readStep = Iterables.getOnlyElement(fusedStep.getStartSteps());
+    checkState(readStep.getOperation() instanceof ReadOperation);
+    BoundedSource source = ((ReadOperation) readStep.getOperation()).getSource();
     conf.set(
         BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
-        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(read.getSource())));
+        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source)));
     job.setInputFormatClass(BeamInputFormat.class);
 
-    // Setup DoFns in BeamMapper.
-    // TODO: support more than one in-path.
-    Graph.NodePath inPath = Iterables.getOnlyElement(inEdge.getPaths());
-
-    Operation mapperParDoRoot = chainParDosInPath(inPath);
-    Operation mapperParDoTail = getTailOperation(mapperParDoRoot);
-    Graph.Step vertexStep = vertex.getStep();
-    if (vertexStep.getTransform() instanceof ParDo.SingleOutput
-        || vertexStep.getTransform() instanceof ParDo.MultiOutput
-        || vertexStep.getTransform() instanceof Window.Assign) {
-      // TODO: add a TailVertex type to simplify the translation.
-      Operation vertexParDo = translateToOperation(vertexStep);
-      Operation mapperWrite = new WriteOperation(
-          getKeyCoder(inEdge.getCoder()),
-          getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy()));
-      mapperParDoTail.attachOutput(vertexParDo, 0);
-      vertexParDo.attachOutput(mapperWrite, 0);
-    } else if (vertexStep.getTransform() instanceof GroupByKey) {
-      Operation reifyOperation = new ReifyTimestampAndWindowsParDoOperation(
-          PipelineOptionsFactory.create(),
-          new TupleTag<>(),
-          ImmutableList.<TupleTag<?>>of(),
-          vertexStep.getWindowingStrategy());
-      Operation mapperWrite = new WriteOperation(
-          getKeyCoder(inEdge.getCoder()),
-          getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy()));
-      mapperParDoTail.attachOutput(reifyOperation, 0);
-      reifyOperation.attachOutput(mapperWrite, 0);
-    } else {
-      throw new UnsupportedOperationException("Transform: " + vertexStep.getTransform());
-    }
-    job.setMapOutputKeyClass(BytesWritable.class);
-    job.setMapOutputValueClass(byte[].class);
-    conf.set(
-        BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
-        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(mapperParDoRoot)));
-    job.setMapperClass(BeamMapper.class);
+    if (fusedStep.containsGroupByKey()) {
+      Graphs.Step groupByKey = fusedStep.getGroupByKeyStep();
+      GroupByKeyOperation operation = (GroupByKeyOperation) groupByKey.getOperation();
+      WindowingStrategy<?, ?> windowingStrategy = operation.getWindowingStrategy();
+      KvCoder<?, ?> kvCoder = operation.getKvCoder();
+
+      Coder<?> reifyValueCoder = getReifyValueCoder(kvCoder.getValueCoder(), windowingStrategy);
+      Graphs.Tag reifyOutputTag = Graphs.Tag.of(new TupleTag<Object>(), reifyValueCoder);
+      Graphs.Step reifyStep = Graphs.Step.of(
+          groupByKey.getFullName() + "-Reify",
+          new ReifyTimestampAndWindowsParDoOperation(options, operation.getWindowingStrategy()),
+          groupByKey.getInputTags(),
+          ImmutableList.of(reifyOutputTag));
+
+      Graphs.Step writeStep = Graphs.Step.of(
+          groupByKey.getFullName() + "-Write",
+          new WriteOperation(kvCoder.getKeyCoder(), reifyValueCoder),
+          ImmutableList.of(reifyOutputTag),
+          Collections.<Graphs.Tag>emptyList());
+
+      Graphs.Step gabwStep = Graphs.Step.of(
+          groupByKey.getFullName() + "-GroupAlsoByWindows",
+          new GroupAlsoByWindowsParDoOperation(options, windowingStrategy, kvCoder),
+          Collections.<Graphs.Tag>emptyList(),
+          groupByKey.getOutputTags());
+
+      fusedStep.addStep(reifyStep);
+      fusedStep.addStep(writeStep);
+      fusedStep.addStep(gabwStep);
+      fusedStep.removeStep(groupByKey);
 
-    if (vertexStep.getTransform() instanceof GroupByKey) {
       // Setup BeamReducer
-      Operation gabwOperation = new GroupAlsoByWindowsParDoOperation(
-          PipelineOptionsFactory.create(),
-          (TupleTag<Object>) vertexStep.getOutputs().iterator().next(),
-          ImmutableList.<TupleTag<?>>of(),
-          vertexStep.getWindowingStrategy(),
-          inEdge.getCoder());
-      Graph.Edge outEdge = Iterables.getOnlyElement(vertex.getOutgoing());
-      Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths());
-      Operation reducerParDoRoot = chainParDosInPath(outPath);
-      Operation reducerParDoTail = getTailOperation(reducerParDoRoot);
-
-      Operation reducerTailParDo = translateToOperation(outEdge.getTail().getStep());
-      if (reducerParDoRoot == null) {
-        gabwOperation.attachOutput(reducerTailParDo, 0);
-      } else {
-        gabwOperation.attachOutput(reducerParDoRoot, 0);
-        reducerParDoTail.attachOutput(reducerTailParDo, 0);
-      }
+      Graphs.Step reducerStartStep = gabwStep;
+      chainOperations(reducerStartStep, fusedStep);
       conf.set(
           BeamReducer.BEAM_REDUCER_KV_CODER,
           Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
-              KvCoder.of(
-                  getKeyCoder(inEdge.getCoder()),
-                  getReifyValueCoder(inEdge.getCoder(), vertexStep.getWindowingStrategy())))));
+              KvCoder.of(kvCoder.getKeyCoder(), reifyValueCoder))));
       conf.set(
           BeamReducer.BEAM_PAR_DO_OPERATION_REDUCER,
-          Base64.encodeBase64String(SerializableUtils.serializeToByteArray(gabwOperation)));
+          Base64.encodeBase64String(
+              SerializableUtils.serializeToByteArray(reducerStartStep.getOperation())));
       job.setReducerClass(BeamReducer.class);
     }
-    job.setOutputFormatClass(NullOutputFormat.class);
-    return job;
-  }
+    // Setup DoFns in BeamMapper.
+    Graphs.Tag readOutputTag = Iterables.getOnlyElement(readStep.getOutputTags());
+    Graphs.Step mapperStartStep = Iterables.getOnlyElement(fusedStep.getConsumers(readOutputTag));
+    chainOperations(mapperStartStep, fusedStep);
 
-  private Coder<Object> getKeyCoder(Coder<?> coder) {
-    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder");
-    return kvCoder.getKeyCoder();
-  }
+    job.setMapOutputKeyClass(BytesWritable.class);
+    job.setMapOutputValueClass(byte[].class);
+    conf.set(
+        BeamMapper.BEAM_PAR_DO_OPERATION_MAPPER,
+        Base64.encodeBase64String(
+            SerializableUtils.serializeToByteArray(mapperStartStep.getOperation())));
+    job.setMapperClass(BeamMapper.class);
 
-  private Coder<Object> getReifyValueCoder(
-      Coder<?> coder, WindowingStrategy<?, ?> windowingStrategy) {
-    KvCoder<Object, Object> kvCoder = (KvCoder<Object, Object>) checkNotNull(coder, "coder");
-    return (Coder) WindowedValue.getFullCoder(
-        kvCoder.getValueCoder(), windowingStrategy.getWindowFn().windowCoder());
-  }
+    job.setOutputFormatClass(NullOutputFormat.class);
 
-  private Operation getTailOperation(@Nullable Operation operation) {
-    if (operation == null) {
-      return null;
-    }
-    if (operation.getOutputReceivers().isEmpty()) {
-      return operation;
-    }
-    OutputReceiver receiver = Iterables.getOnlyElement(operation.getOutputReceivers());
-    if (receiver.getReceivingOperations().isEmpty()) {
-      return operation;
-    }
-    return getTailOperation(Iterables.getOnlyElement(receiver.getReceivingOperations()));
+    return job;
   }
 
-  private Operation chainParDosInPath(Graph.NodePath path) {
-    List<Graph.Step> parDos = new ArrayList<>();
-    // TODO: we should not need this filter.
-    parDos.addAll(FluentIterable.from(path.steps())
-        .filter(new Predicate<Graph.Step>() {
-          @Override
-          public boolean apply(Graph.Step input) {
-            PTransform<?, ?> transform = input.getTransform();
-            return !(transform instanceof Read.Bounded);
-          }})
-        .toList());
-
-    Operation root = null;
-    Operation prev = null;
-    for (Graph.Step step : parDos) {
-      Operation current = translateToOperation(step);
-      if (prev == null) {
-        root = current;
-      } else {
-        // TODO: set a proper outputNum for ParDo.MultiOutput instead of zero.
-        prev.attachOutput(current, 0);
+  private void chainOperations(Graphs.Step current, Graphs.FusedStep fusedStep) {
+    Operation<?> operation = current.getOperation();
+    List<Graphs.Tag> outputTags = current.getOutputTags();
+    for (int index = 0; index < outputTags.size(); ++index) {
+      for (Graphs.Step consumer : fusedStep.getConsumers(outputTags.get(index))) {
+        operation.attachConsumer(index, consumer.getOperation());
+      }
+    }
+    for (Graphs.Tag outTag : outputTags) {
+      for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
+        chainOperations(consumer, fusedStep);
       }
-      prev = current;
     }
-    return root;
   }
 
-  private Operation translateToOperation(Graph.Step parDoStep) {
-    PTransform<?, ?> transform = parDoStep.getTransform();
-    DoFn<Object, Object> doFn;
-    if (transform instanceof ParDo.SingleOutput) {
-      return new NormalParDoOperation(
-          ((ParDo.SingleOutput) transform).getFn(),
-          PipelineOptionsFactory.create(),
-          (TupleTag<Object>) parDoStep.getOutputs().iterator().next(),
-          ImmutableList.<TupleTag<?>>of(),
-          parDoStep.getWindowingStrategy());
-    } else if (transform instanceof ParDo.MultiOutput) {
-      return new NormalParDoOperation(
-          ((ParDo.MultiOutput) transform).getFn(),
-          PipelineOptionsFactory.create(),
-          (TupleTag<Object>) parDoStep.getOutputs().iterator().next(),
-          ImmutableList.<TupleTag<?>>of(),
-          parDoStep.getWindowingStrategy());
-    } else if (transform instanceof Window.Assign) {
-      return new WindowAssignOperation<>(1, parDoStep.getWindowingStrategy().getWindowFn());
-    } else {
-      throw new UnsupportedOperationException("Transform: " + transform);
-    }
+  private Coder<Object> getReifyValueCoder(
+      Coder<?> valueCoder, WindowingStrategy<?, ?> windowingStrategy) {
+    // TODO: do we need full coder to encode windows.
+    return (Coder) WindowedValue.getFullCoder(
+        valueCoder, windowingStrategy.getWindowFn().windowCoder());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
index 1da39a9..fd1b528 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
@@ -26,16 +26,16 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
- * Created by peihe on 26/07/2017.
+ * {@link Operation} that executes a {@link DoFn}.
  */
-public class NormalParDoOperation extends ParDoOperation {
+public class NormalParDoOperation<InputT, OutputT> extends ParDoOperation<InputT, OutputT> {
 
-  private final DoFn<Object, Object> doFn;
+  private final DoFn<InputT, OutputT> doFn;
 
   public NormalParDoOperation(
-      DoFn<Object, Object> doFn,
+      DoFn<InputT, OutputT> doFn,
       PipelineOptions options,
-      TupleTag<Object> mainOutputTag,
+      TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
     super(options, mainOutputTag, sideOutputTags, windowingStrategy);
@@ -43,7 +43,7 @@ public class NormalParDoOperation extends ParDoOperation {
   }
 
   @Override
-  DoFn<Object, Object> getDoFn() {
+  DoFn<InputT, OutputT> getDoFn() {
     return doFn;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
index 6951909..187ea79 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -1,14 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
- * Created by peihe on 26/07/2017.
+ * Class that processes elements and forwards outputs to consumers.
  */
-public abstract class Operation implements Serializable {
+public abstract class Operation<T> implements Serializable {
   private final OutputReceiver[] receivers;
 
   public Operation(int numOutputs) {
@@ -37,7 +55,7 @@ public abstract class Operation implements Serializable {
   /**
    * Processes the element.
    */
-  public abstract void process(Object elem);
+  public abstract void process(WindowedValue<T> elem);
 
   /**
    * Finishes this Operation's execution.
@@ -62,8 +80,8 @@ public abstract class Operation implements Serializable {
   /**
    * Adds an output to this Operation.
    */
-  public void attachOutput(Operation output, int outputNum) {
-    OutputReceiver fanOut = receivers[outputNum];
-    fanOut.addOutput(output);
+  public void attachConsumer(int outputIndex, Operation consumer) {
+    OutputReceiver fanOut = receivers[outputIndex];
+    fanOut.addOutput(consumer);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
index 6aeefd2..3dab890 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/OutputReceiver.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * OutputReceiver that forwards each input it receives to each of a list of down stream operations.
@@ -42,7 +43,7 @@ public class OutputReceiver implements Serializable {
   /**
    * Processes the element.
    */
-  public void process(Object elem) {
+  public void process(WindowedValue<?> elem) {
     for (Operation out : receivingOperations) {
       if (out != null) {
         out.process(elem);

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index 2627d20..a76773f 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -36,19 +36,19 @@ import org.slf4j.LoggerFactory;
 /**
  * Operation for ParDo.
  */
-public abstract class ParDoOperation extends Operation {
+public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT> {
   private static final Logger LOG = LoggerFactory.getLogger(ParDoOperation.class);
 
   protected final SerializedPipelineOptions options;
-  protected final TupleTag<Object> mainOutputTag;
+  protected final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
   protected final WindowingStrategy<?, ?> windowingStrategy;
 
-  private DoFnRunner<Object, Object> fnRunner;
+  private DoFnRunner<InputT, OutputT> fnRunner;
 
   public ParDoOperation(
       PipelineOptions options,
-      TupleTag<Object> mainOutputTag,
+      TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
     super(1 + sideOutputTags.size());
@@ -61,7 +61,7 @@ public abstract class ParDoOperation extends Operation {
   /**
    * Returns a {@link DoFn} for processing inputs.
    */
-  abstract DoFn<Object, Object> getDoFn();
+  abstract DoFn<InputT, OutputT> getDoFn();
 
   @Override
   public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
@@ -82,9 +82,9 @@ public abstract class ParDoOperation extends Operation {
    * Processes the element.
    */
   @Override
-  public void process(Object elem) {
+  public void process(WindowedValue<InputT> elem) {
     LOG.info("elem: {}.", elem);
-    fnRunner.processElement((WindowedValue<Object>) elem);
+    fnRunner.processElement(elem);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
new file mode 100644
index 0000000..1a1373a
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Translates a {@link ParDo} to a {@link Operation}.
+ */
+class ParDoTranslator<InputT, OutputT>
+    extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
+
+  @Override
+  public void translateNode(
+      ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    NormalParDoOperation operation = new NormalParDoOperation(
+        transform.getFn(),
+        userGraphContext.getOptions(),
+        transform.getMainOutputTag(),
+        transform.getAdditionalOutputTags().getAll(),
+        ((PCollection) userGraphContext.getInput()).getWindowingStrategy());
+
+    context.addInitStep(Graphs.Step.of(
+        userGraphContext.getStepName(),
+        operation,
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
new file mode 100644
index 0000000..0710827
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.io.Read;
+
+/**
+ * Translates a {@link Read.Bounded} to a {@link ReadOperation}.
+ */
+class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
+  @Override
+  public void translateNode(Read.Bounded transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    ReadOperation operation = new ReadOperation(transform.getSource());
+    context.addInitStep(Graphs.Step.of(
+        userGraphContext.getStepName(),
+        operation,
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
new file mode 100644
index 0000000..c199dc6
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * A Read.Bounded place holder {@link Operation} during pipeline translation.
+ */
+class ReadOperation<T> extends Operation<T> {
+  private final BoundedSource<T> source;
+
+  ReadOperation(BoundedSource<T> source) {
+    super(1);
+    this.source = checkNotNull(source, "source");
+  }
+
+  @Override
+  public void process(WindowedValue elem) {
+    throw new IllegalStateException(
+        String.format("%s should not in execution graph.", this.getClass().getSimpleName()));
+  }
+
+  BoundedSource<?> getSource() {
+    return source;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
index ec954bb..83d1af5 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -1,5 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
+import com.google.common.collect.ImmutableList;
 import java.util.List;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -10,16 +28,14 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
- * Created by peihe on 27/07/2017.
+ * {@link Operation} that executes {@link ReifyTimestampAndWindowsDoFn}.
  */
 public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation {
 
   public ReifyTimestampAndWindowsParDoOperation(
       PipelineOptions options,
-      TupleTag<Object> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
-    super(options, mainOutputTag, sideOutputTags, windowingStrategy);
+    super(options, new TupleTag<>(), ImmutableList.<TupleTag<?>>of(), windowingStrategy);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java
new file mode 100644
index 0000000..f495372
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TransformTranslator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+interface TransformTranslator<T extends PTransform<?, ?>> {
+
+  void translateNode(T transform, TranslationContext context);
+
+  /**
+   * Returns true if this translator can translate the given transform.
+   */
+  boolean canTranslate(T transform, TranslationContext context);
+
+  /**
+   * Default translator.
+   */
+  class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+    @Override
+    public void translateNode(T1 transform, TranslationContext context) {
+
+    }
+
+    @Override
+    public boolean canTranslate(T1 transform, TranslationContext context) {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
new file mode 100644
index 0000000..0df365e
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Class that maintains contexts during translation.
+ */
+public class TranslationContext {
+
+  private final UserGraphContext userGraphContext;
+  private final Graph<Graphs.Step, Graphs.Tag> initGraph;
+
+  public TranslationContext(MapReducePipelineOptions options) {
+    this.userGraphContext = new UserGraphContext(options);
+    this.initGraph = new Graph<>();
+  }
+
+  public UserGraphContext getUserGraphContext() {
+    return userGraphContext;
+  }
+
+  public void addInitStep(Graphs.Step step) {
+    initGraph.addStep(step);
+  }
+
+  /**
+   * Returns {@link Graphs.Step steps} in reverse topological order.
+   */
+  public Graph<Graphs.Step, Graphs.Tag> getInitGraph() {
+    return initGraph;
+  }
+
+  /**
+   * Context of user graph.
+   */
+  public static class UserGraphContext {
+    private final MapReducePipelineOptions options;
+    private final Map<PValue, TupleTag<?>> pValueToTupleTag;
+    private TransformHierarchy.Node currentNode;
+
+    public UserGraphContext(MapReducePipelineOptions options) {
+      this.options = checkNotNull(options, "options");
+      this.pValueToTupleTag = Maps.newHashMap();
+      this.currentNode = null;
+    }
+
+    public MapReducePipelineOptions getOptions() {
+      return options;
+    }
+
+    public void setCurrentNode(TransformHierarchy.Node node) {
+      this.currentNode = node;
+      for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet()) {
+        pValueToTupleTag.put(entry.getValue(), entry.getKey());
+      }
+    }
+
+    public String getStepName() {
+      return currentNode.getFullName();
+    }
+
+    public PValue getInput() {
+      return Iterables.get(currentNode.getInputs().values(), 0);
+    }
+
+    public PValue getOutput() {
+      return Iterables.get(currentNode.getOutputs().values(), 0);
+    }
+
+    public List<Graphs.Tag> getInputTags() {
+      return FluentIterable.from(currentNode.getInputs().values())
+          .transform(new Function<PValue, Graphs.Tag>() {
+            @Override
+            public Graphs.Tag apply(PValue pValue) {
+              checkState(
+                  pValueToTupleTag.containsKey(pValue),
+                  String.format("Failed to find TupleTag for pValue: %s.", pValue));
+              return Graphs.Tag.of(
+                  pValueToTupleTag.get(pValue), ((PCollection<?>) pValue).getCoder());
+            }})
+          .toList();
+    }
+
+    public List<Graphs.Tag> getOutputTags() {
+      return FluentIterable.from(currentNode.getOutputs().entrySet())
+          .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() {
+            @Override
+            public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
+              return Graphs.Tag.of(entry.getKey(), ((PCollection<?>) entry.getValue()).getCoder());
+            }})
+          .toList();
+    }
+
+    public TupleTag<?> getOnlyOutputTag() {
+      return Iterables.getOnlyElement(currentNode.getOutputs().keySet());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
new file mode 100644
index 0000000..f79260a
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslatorRegistry.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Lookup table mapping PTransform types to associated TransformTranslator implementations.
+ */
+public class TranslatorRegistry {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
+
+  private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS =
+      new HashMap<>();
+
+  static {
+    TRANSLATORS.put(Read.Bounded.class, new ReadBoundedTranslator());
+    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslator());
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+    TRANSLATORS.put(View.CreatePCollectionView.class, new ViewTranslator());
+  }
+
+  public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+    TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
+    if (translator == null) {
+      LOG.warn("Unsupported operator={}", transform.getClass().getName());
+    }
+    return translator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java
new file mode 100644
index 0000000..093f00e
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.ByteArrayOutputStream;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * {@link Operation} that materializes views.
+ */
+public class ViewOperation<T> extends Operation<T> {
+
+  private final Coder<WindowedValue<T>> valueCoder;
+
+  private transient TaskInputOutputContext<Object, Object, Object, Object> taskContext;
+
+  public ViewOperation(Coder<WindowedValue<T>> valueCoder) {
+    super(0);
+    this.valueCoder = checkNotNull(valueCoder, "valueCoder");
+  }
+
+  @Override
+  public void start(TaskInputOutputContext<Object, Object, Object, Object> taskContext) {
+    this.taskContext = checkNotNull(taskContext, "taskContext");
+  }
+
+  @Override
+  public void process(WindowedValue<T> elem) {
+    try {
+      ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
+      valueCoder.encode(elem, valueStream);
+      taskContext.write(new BytesWritable("view".getBytes()), valueStream.toByteArray());
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
new file mode 100644
index 0000000..815ce77
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ViewTranslator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.View;
+
+/**
+ * Translates a {@link View.CreatePCollectionView} to a {@link ViewOperation}.
+ */
+public class ViewTranslator extends TransformTranslator.Default<View.CreatePCollectionView<?, ?>> {
+
+  @Override
+  public void translateNode(
+      View.CreatePCollectionView<?, ?> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    ViewOperation<?> operation =
+        new ViewOperation<>((Coder) transform.getView().getPCollection().getCoder());
+
+    context.addInitStep(Graphs.Step.of(
+        userGraphContext.getStepName(),
+        operation,
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
index 144ef16..3279e11 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignOperation.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -12,28 +29,24 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
 
 /**
- * Created by peihe on 27/07/2017.
+ * {@link Operation} that executes for assigning windows to elements.
  */
-public class WindowAssignOperation<T, W extends BoundedWindow> extends Operation {
+public class WindowAssignOperation<T, W extends BoundedWindow> extends Operation<T> {
   private final WindowFn<T, W> windowFn;
 
-  public WindowAssignOperation(int numOutputs, WindowFn<T, W> windowFn) {
-    super(numOutputs);
+  public WindowAssignOperation(WindowFn<T, W> windowFn) {
+    super(1);
     this.windowFn = checkNotNull(windowFn, "windowFn");
   }
 
   @Override
-  public void process(Object elem) {
-    WindowedValue windowedValue = (WindowedValue) elem;
+  public void process(WindowedValue<T> elem) {
     try {
-      Collection<W> windows = windowFn.assignWindows(new AssignContextInternal<>(windowFn, windowedValue));
+      Collection<W> windows = windowFn.assignWindows(new AssignContextInternal<>(windowFn, elem));
       for (W window : windows) {
         OutputReceiver receiver = Iterables.getOnlyElement(getOutputReceivers());
         receiver.process(WindowedValue.of(
-            windowedValue.getValue(),
-            windowedValue.getTimestamp(),
-            window,
-            windowedValue.getPane()));
+            elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
       }
     } catch (Exception e) {
       Throwables.throwIfUnchecked(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..367c375
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WindowAssignTranslator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+/**
+ * Translates a {@link Window.Assign} to a {@link WindowAssignOperation}.
+ */
+public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+
+  @Override
+  public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    WindowAssignOperation<T, ?> operation = new WindowAssignOperation<>(transform.getWindowFn());
+    context.addInitStep(Graphs.Step.of(
+        userGraphContext.getStepName(),
+        operation,
+        userGraphContext.getInputTags(),
+        userGraphContext.getOutputTags()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
index 0585032..2eb4684 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/WriteOperation.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -5,19 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.base.Throwables;
 import java.io.ByteArrayOutputStream;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
- * Created by peihe on 26/07/2017.
+ * {@link Operation} that materializes input for group by key.
  */
-public class WriteOperation extends Operation {
+public class WriteOperation<T> extends Operation<T> {
 
   private final Coder<Object> keyCoder;
   private final Coder<Object> valueCoder;
@@ -36,14 +49,14 @@ public class WriteOperation extends Operation {
   }
 
   @Override
-  public void process(Object elem) {
-    WindowedValue<KV<?, ?>> windowedElem = (WindowedValue<KV<?, ?>>) elem;
+  public void process(WindowedValue<T> elem) {
+    KV<?, ?> kv = (KV<?, ?>) elem.getValue();
     try {
       ByteArrayOutputStream keyStream = new ByteArrayOutputStream();
-      keyCoder.encode(windowedElem.getValue().getKey(), keyStream);
+      keyCoder.encode(kv.getKey(), keyStream);
 
       ByteArrayOutputStream valueStream = new ByteArrayOutputStream();
-      valueCoder.encode(windowedElem.getValue().getValue(), valueStream);
+      valueCoder.encode(kv.getValue(), valueStream);
       taskContext.write(new BytesWritable(keyStream.toByteArray()), valueStream.toByteArray());
     } catch (Exception e) {
       Throwables.throwIfUnchecked(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
index a548ba7..363ba01 100644
--- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce;
 
 import org.apache.beam.sdk.Pipeline;
@@ -10,7 +27,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.log4j.BasicConfigurator;
@@ -75,11 +92,11 @@ public class WordCountTest {
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
     p.apply("ReadLines", TextIO.read().from(input))
-        .apply(Window.<String>into(SlidingWindows.of(Duration.millis(100))))
+        .apply(Window.<String>into(FixedWindows.of(Duration.millis(1000))))
         .apply(ParDo.of(new ExtractWordsFn()))
         .apply(Count.<String>perElement())
-        .apply(MapElements.via(new FormatAsTextFn()));
-        //.apply("WriteCounts", TextIO.write().to(output));
+        .apply(MapElements.via(new FormatAsTextFn()))
+        .apply("WriteCounts", TextIO.write().to(output));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
index 4f0c283..76c8311 100644
--- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
@@ -1,12 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.Iterables;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
@@ -23,17 +43,18 @@ public class GraphConverterTest {
 
   @Test
   public void testCombine() throws Exception {
-    Pipeline p = Pipeline.create();
+    MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
+    options.setRunner(CrashingRunner.class);
+    Pipeline p = Pipeline.create(options);
     PCollection<KV<String, Integer>> input = p
         .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
         .apply(Sum.<String>integersPerKey());
-    GraphConverter graphConverter = new GraphConverter();
+    TranslationContext context = new TranslationContext(options);
+    GraphConverter graphConverter = new GraphConverter(context);
     p.traverseTopologically(graphConverter);
 
-    Graph graph = graphConverter.getGraph();
+    Graph<Graphs.Step, Graphs.Tag> initGraph = context.getInitGraph();
 
-    assertEquals(3, Iterables.size(graph.getAllVertices()));
-    assertEquals(2, Iterables.size(graph.getAllEdges()));
-    assertEquals(1, Iterables.size(graph.getLeafVertices()));
+    assertEquals(3, Iterables.size(initGraph.getSteps()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
index c98f817..cf5262f 100644
--- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
@@ -1,12 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.Iterables;
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.KV;
@@ -23,20 +43,21 @@ public class GraphPlannerTest {
 
   @Test
   public void testCombine() throws Exception {
-    Pipeline p = Pipeline.create();
+    MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
+    options.setRunner(CrashingRunner.class);
+    Pipeline p = Pipeline.create(options);
     PCollection<KV<String, Integer>> input = p
         .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
         .apply(Sum.<String>integersPerKey());
-    GraphConverter graphConverter = new GraphConverter();
-    p.traverseTopologically(graphConverter);
 
-    Graph graph = graphConverter.getGraph();
+    TranslationContext context = new TranslationContext(options);
+    GraphConverter graphConverter = new GraphConverter(context);
+    p.traverseTopologically(graphConverter);
 
     GraphPlanner planner = new GraphPlanner();
-    Graph fusedGraph = planner.plan(graph);
+    Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph());
 
-    assertEquals(3, Iterables.size(fusedGraph.getAllVertices()));
-    assertEquals(2, Iterables.size(fusedGraph.getAllEdges()));
-    assertEquals(1, Iterables.size(fusedGraph.getLeafVertices()));
+    assertEquals(1, Iterables.size(fusedGraph.getFusedSteps()));
+    assertEquals(3, Iterables.getOnlyElement(fusedGraph.getFusedSteps()).getSteps().size());
   }
 }


Mime
View raw message