beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [34/36] beam git commit: mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed comments.
Date Thu, 07 Sep 2017 18:39:43 GMT
mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed comments.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32aeb7ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32aeb7ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32aeb7ac

Branch: refs/heads/mr-runner
Commit: 32aeb7ac3d49ade0dc3ad79e711e7b624091d485
Parents: 9d1db98
Author: Pei He <pei@apache.org>
Authored: Thu Aug 31 16:21:17 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Mon Sep 4 11:06:41 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      |   2 +-
 .../mapreduce/MapReducePipelineOptions.java     |   3 +
 .../mapreduce/MapReducePipelineResult.java      |   6 +
 .../runners/mapreduce/MapReduceRegistrar.java   |   6 +
 .../beam/runners/mapreduce/MapReduceRunner.java |   5 +-
 .../beam/runners/mapreduce/package-info.java    |   2 +-
 .../mapreduce/translation/BeamInputFormat.java  |   2 +-
 .../mapreduce/translation/BeamMapper.java       |   4 +-
 .../mapreduce/translation/BeamReducer.java      |   6 +-
 .../translation/ConfigurationUtils.java         |   1 -
 .../mapreduce/translation/DotfileWriter.java    |  22 ++--
 .../translation/FileReadOperation.java          |   1 -
 .../runners/mapreduce/translation/Graph.java    |   8 +-
 .../mapreduce/translation/GraphConverter.java   |   2 +-
 .../mapreduce/translation/GraphPlanner.java     |   1 -
 .../runners/mapreduce/translation/Graphs.java   |  13 +++
 .../translation/PartitionOperation.java         |   2 -
 .../ReifyTimestampAndWindowsParDoOperation.java |   2 +-
 .../translation/ShuffleWriteOperation.java      |   1 -
 .../translation/TranslationContext.java         |   7 +-
 .../beam/runners/mapreduce/WordCountTest.java   | 117 -------------------
 21 files changed, 65 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 3b253a7..90d876b 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -24,7 +24,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
   
-  <artifactId>beam-runners-map-reduce</artifactId>
+  <artifactId>beam-runners-mapreduce</artifactId>
 
   <name>Apache Beam :: Runners :: MapReduce</name>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index cfbc006..7cff40d 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -48,6 +48,9 @@ public interface MapReducePipelineOptions extends PipelineOptions {
   String getFileOutputDir();
   void setFileOutputDir(String fileOutputDir);
 
+  /**
+   * Returns the {@link Class} that constructs MapReduce job through Beam.
+   */
   class JarClassInstanceFactory implements DefaultValueFactory<Class<?>> {
     @Override
     public Class<?> create(PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
index 90c521a..933d8f6 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
@@ -27,6 +27,12 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.hadoop.mapreduce.Job;
 import org.joda.time.Duration;
 
+/**
+ * A {@link PipelineResult} of executing {@link org.apache.beam.sdk.Pipeline Pipelines} using
+ * {@link MapReduceRunner}.
+ *
+ * <p>It is synchronous (returned after the pipeline is finished), and is used for
querying metrics.
+ */
 public class MapReducePipelineResult implements PipelineResult {
 
   private final List<Job> jobs;

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
index c8b0eea..1029218 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
@@ -31,6 +31,9 @@ public class MapReduceRegistrar {
   private MapReduceRegistrar() {
   }
 
+  /**
+   * Registers the {@link MapReduceRunner}.
+   */
   @AutoService(PipelineRunnerRegistrar.class)
   public static class Runner implements PipelineRunnerRegistrar {
     @Override
@@ -39,6 +42,9 @@ public class MapReduceRegistrar {
     }
   }
 
+  /**
+   * Registers the {@link MapReducePipelineOptions}.
+   */
   @AutoService(PipelineOptionsRegistrar.class)
   public static class Options implements PipelineOptionsRegistrar {
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 8198848..85b7d1b 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -41,7 +41,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link PipelineRunner} for MapReduce.
+ * {@link PipelineRunner} for Hadoop MapReduce.
+ *
+ * <p>It translates a Beam {@link Pipeline} to a series of MapReduce {@link Job jobs},
and executes
+ * them locally or on a Hadoop cluster.
  */
 public class MapReduceRunner extends PipelineRunner<PipelineResult> {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
index d511405..e452d92 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/package-info.java
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * MapReduce runner implementation.
+ * Implementation of the Beam runner for Apache Hadoop MapReduce.
  */
 package org.apache.beam.runners.mapreduce;

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 9dc3396..3d0b8ea 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -117,7 +117,7 @@ public class BeamInputFormat<T> extends InputFormat {
     return ((BeamInputSplit) split).createReader();
   }
 
-  public static class BeamInputSplit<T> extends InputSplit implements Writable {
+  private static class BeamInputSplit<T> extends InputSplit implements Writable {
     private String stepName;
     private BoundedSource<T> boundedSource;
     private SerializedPipelineOptions options;

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index b03236f..46c74c0 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -30,7 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Adapter for executing Beam transforms in {@link Mapper}.
+ * Adapter for executing {@link Operation operations} in {@link Mapper}.
  */
 public class BeamMapper<ValueInT, ValueOutT>
     extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>
{
@@ -58,6 +58,8 @@ public class BeamMapper<ValueInT, ValueOutT>
       Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context
context)
       throws IOException, InterruptedException {
     LOG.info("key: {} value: {}.", key, value);
+    // Only needs to pass KV to the following PartitionOperation. However, we have to wrap
it in a
+    // global window because of the method signature.
     operation.process(WindowedValue.valueInGlobalWindow(KV.of(key, value)));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
index a382904..b69be32 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -33,15 +33,13 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * Adapter for executing Beam transforms in {@link Reducer}.
+ * Adapter for executing {@link Operation operations} in {@link Reducer}.
  */
 public class BeamReducer<ValueInT, ValueOutT>
     extends Reducer<BytesWritable, byte[], Object, WindowedValue<ValueOutT>>
{
@@ -92,6 +90,8 @@ public class BeamReducer<ValueInT, ValueOutT>
           }}));
     Object decodedKey = keyCoder.decode(new ByteArrayInputStream(key.getBytes()));
     LOG.info("key: {} value: {}.", decodedKey, decodedValues);
+    // Only needs to pass KV to the following GABW operation. However, we have to wrap it
in a
+    // global window because of the method signature.
     operation.process(
         WindowedValue.valueInGlobalWindow(KV.of(decodedKey, decodedValues)));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
index 4ec50bd..a905d29 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
@@ -23,7 +23,6 @@ import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
index 863c4c9..12cc03c 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
@@ -36,18 +36,18 @@ public class DotfileWriter {
     int i = 0;
     for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
       String clusterId = String.format("cluster_%d", i++);
-      sb.append(String.format("  subgraph \"%s\" {\n", clusterId));
-      sb.append(String.format("    \"%s\" [shape=point style=invis];\n", clusterId));
+      sb.append(String.format("  subgraph \"%s\" {%n", clusterId));
+      sb.append(String.format("    \"%s\" [shape=point style=invis];%n", clusterId));
       fusedStepToId.put(fusedStep, clusterId);
 
       Set<String> nodeDefines = Sets.newHashSet();
       for (Graphs.Step step : fusedStep.getSteps()) {
-        nodeDefines.add(String.format("    \"%s\" [shape=box];\n", step.getFullName()));
+        nodeDefines.add(String.format("    \"%s\" [shape=box];%n", step.getFullName()));
         for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
-          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];\n", inTag));
+          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];%n", inTag));
         }
         for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
-          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];\n", outTag));
+          nodeDefines.add(String.format("    \"%s\" [shape=ellipse];%n", outTag));
         }
       }
       for (String str : nodeDefines) {
@@ -59,16 +59,16 @@ public class DotfileWriter {
       // Edges within fused steps.
       for (Graphs.Step step : fusedStep.getSteps()) {
         for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
-          sb.append(String.format("  \"%s\" -> \"%s\";\n", inTag, step));
+          sb.append(String.format("  \"%s\" -> \"%s\";%n", inTag, step));
         }
         for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
-          sb.append(String.format("  \"%s\" -> \"%s\";\n", step, outTag));
+          sb.append(String.format("  \"%s\" -> \"%s\";%n", step, outTag));
         }
       }
 
       // Edges between sub-graphs.
       for (Graphs.Tag inTag : fusedGraph.getInputTags(fusedStep)) {
-        sb.append(String.format("  \"%s\" -> \"%s\";\n", inTag, fusedStepToId.get(fusedStep)));
+        sb.append(String.format("  \"%s\" -> \"%s\";%n", inTag, fusedStepToId.get(fusedStep)));
       }
     }
     sb.append("}\n");
@@ -79,12 +79,12 @@ public class DotfileWriter {
     StringBuilder sb = new StringBuilder();
     sb.append("\ndigraph G {\n");
     for (Graphs.Step step : fusedStep.getSteps()) {
-      sb.append(String.format("  \"%s\" [shape=box];\n", step.getFullName()));
+      sb.append(String.format("  \"%s\" [shape=box];%n", step.getFullName()));
       for (Graph.AbstractTag inTag : fusedStep.getInputTags(step)) {
-        sb.append(String.format("  \"%s\" -> \"%s\";\n", inTag, step));
+        sb.append(String.format("  \"%s\" -> \"%s\";%n", inTag, step));
       }
       for (Graph.AbstractTag outTag : fusedStep.getOutputTags(step)) {
-        sb.append(String.format("  \"%s\" -> \"%s\";\n", step, outTag));
+        sb.append(String.format("  \"%s\" -> \"%s\";%n", step, outTag));
       }
     }
     sb.append("}\n");

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index f212252..eb5bef4 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index 144f9a4..b4549d3 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -29,9 +29,7 @@ import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Objects;
 import java.util.Set;
 
@@ -206,9 +204,15 @@ public class Graph<StepT extends Graph.AbstractStep, TagT extends
Graph.Abstract
   interface Vertex {
   }
 
+  /**
+   * Step {@link Vertex}.
+   */
   public abstract static class AbstractStep implements Vertex {
   }
 
+  /**
+   * Tag {@link Vertex}.
+   */
   public abstract static class AbstractTag implements Vertex {
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
index 458961f..1a4988b 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -123,7 +123,7 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults
{
 
   public String getDotfile() {
     return String.format(
-        "\ndigraph G {\n%s%s}\n",
+        "%ndigraph G {%n%s%s}%n",
         dotfileNodesBuilders.peek().toString(),
         dotfileEdgesBuilder.toString());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 09998ea..bc360fb 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * Class that optimizes the initial graph to a fused graph.

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
index 0b93c3a..f23e572 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -34,6 +34,9 @@ public class Graphs {
 
   private Graphs() {}
 
+  /**
+   * Class that represents an optimized graph.
+   */
   public static class FusedGraph {
     private final Graph<FusedStep, Tag> graph;
     private int stageId = 0;
@@ -121,6 +124,10 @@ public class Graphs {
     }
   }
 
+  /**
+   * An {@link Graph.AbstractStep} that represents an optimized sub-graph that can be executed
+   * in one MapReduce job.
+   */
   public static class FusedStep extends Graph.AbstractStep {
     private final int stageId;
     private final Graph<Step, Tag> steps;
@@ -213,6 +220,9 @@ public class Graphs {
     }
   }
 
+  /**
+   * An {@link Graph.AbstractStep} that represents one {@link Operation}.
+   */
   @AutoValue
   public abstract static class Step extends Graph.AbstractStep {
     abstract String getFullName();
@@ -230,6 +240,9 @@ public class Graphs {
     }
   }
 
+  /**
+   * An {@link Graph.AbstractTag} that contains information about input/output data.
+   */
   @AutoValue
   public abstract static class Tag extends Graph.AbstractTag implements Serializable {
     abstract String getName();

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
index 687b5b9..dc0f81a 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/PartitionOperation.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.mapreduce.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
index 9d6b895..0e02bbb 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -45,7 +45,7 @@ public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation
{
     return (DoFn) new ReifyTimestampAndWindowsDoFn<>();
   }
 
-  public class ReifyTimestampAndWindowsDoFn<K, V>
+  private static class ReifyTimestampAndWindowsDoFn<K, V>
       extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
index 782cfef..a8fae1b 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ShuffleWriteOperation.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Throwables;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index 93856de..e908e93 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -86,7 +86,7 @@ public class TranslationContext {
       this.currentNode = node;
       for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet())
{
         pValueToTupleTag.put(entry.getValue(), entry.getKey());
-        // TODO: this is a hack to get around that ViewAsXXX.expand() return wrong output
PValue.
+        // TODO: this is a hack to get around that ViewAsXYZ.expand() return wrong output
PValue.
         if (node.getTransform() instanceof View.CreatePCollectionView) {
           View.CreatePCollectionView view = (View.CreatePCollectionView) node.getTransform();
           pValueToTupleTag.put(view.getView(), view.getView().getTagInternal());
@@ -125,7 +125,10 @@ public class TranslationContext {
               if (pValue instanceof PCollection) {
                 PCollection<?> pc = (PCollection<?>) pValue;
                 return Graphs.Tag.of(
-                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder(), pc.getWindowingStrategy());
+                    pc.getName(),
+                    pValueToTupleTag.get(pValue),
+                    pc.getCoder(),
+                    pc.getWindowingStrategy());
               } else if (pValue instanceof PCollectionView){
                 PCollectionView pView = (PCollectionView) pValue;
                 return Graphs.Tag.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
deleted file mode 100644
index 263905c..0000000
--- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.mapreduce;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.MetricNameFilter;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.metrics.MetricsFilter;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.log4j.BasicConfigurator;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test that runs WordCount.
- */
-@RunWith(JUnit4.class)
-public class WordCountTest {
-
-  public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
-
-  /**
-   * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
-   * statically out-of-line. This DoFn tokenizes lines of text into individual words; we
pass it
-   * to a ParDo in the pipeline.
-   */
-  static class ExtractWordsFn extends DoFn<String, String> {
-    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
-    private final Counter nonEmptyLines = Metrics.counter(ExtractWordsFn.class, "nonEmptyLines");
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      if (c.element().trim().isEmpty()) {
-        emptyLines.inc();
-      } else {
-        nonEmptyLines.inc();
-      }
-
-      // Split the line into words.
-      String[] words = c.element().split(TOKENIZER_PATTERN);
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  /** A SimpleFunction that converts a Word and Count into a printable string. */
-  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String>
{
-    @Override
-    public String apply(KV<String, Long> input) {
-      return input.getKey() + ": " + input.getValue();
-    }
-  }
-
-  @Test
-  public void testWordCount() {
-    BasicConfigurator.configure();
-
-    String input = "/Users/peihe/github/beam/LICENSE";
-    String output =  "./output";
-    MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
-    //options.setJarClass(this.getClass());
-    options.setRunner(MapReduceRunner.class);
-    Pipeline p = Pipeline.create(options);
-
-    // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes
the
-    // static FormatAsTextFn() to the ParDo transform.
-    p.apply("ReadLines", TextIO.read().from(input))
-        .apply(Window.<String>into(FixedWindows.of(Duration.millis(1000))))
-        .apply(ParDo.of(new ExtractWordsFn()))
-        .apply(Count.<String>perElement())
-        .apply(MapElements.via(new FormatAsTextFn()))
-        .apply("WriteCounts", TextIO.write().to(output));
-
-    PipelineResult result = p.run();
-    Iterable<MetricResult<Long>> counters = result.metrics()
-        .queryMetrics(
-            MetricsFilter.builder()
-                .addNameFilter(MetricNameFilter.named(ExtractWordsFn.class, "emptyLines"))
-                .build())
-        .counters();
-    System.out.println(counters.iterator().next());
-  }
-}


Mime
View raw message