beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [02/36] beam git commit: MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.
Date Thu, 07 Sep 2017 18:39:11 GMT
MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8b366de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8b366de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8b366de

Branch: refs/heads/mr-runner
Commit: a8b366de9e4e0c79a7800184afc79b377477b8ed
Parents: 092380c
Author: Pei He <pei@apache.org>
Authored: Thu Jul 13 14:09:10 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Aug 31 14:13:46 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/mapreduce/MapReduceRunner.java |  12 +++
 .../runners/mapreduce/translation/Graph.java    |  89 ++++++++++++++-
 .../mapreduce/translation/GraphConverter.java   |   6 +-
 .../mapreduce/translation/GraphPlanner.java     |   1 +
 .../beam/runners/mapreduce/WordCountTest.java   | 108 +++++++++++++++++++
 .../translation/GraphConverterTest.java         |  39 +++++++
 .../mapreduce/translation/GraphPlannerTest.java |  42 ++++++++
 7 files changed, 294 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index bb9555e..247a8e5 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -3,11 +3,23 @@ package org.apache.beam.runners.mapreduce;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * {@link PipelineRunner} for crunch.
  */
 public class MapReduceRunner extends PipelineRunner<PipelineResult> {
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static MapReduceRunner fromOptions(PipelineOptions options) {
+    return new MapReduceRunner();
+  }
+
   @Override
   public PipelineResult run(Pipeline pipeline) {
     return null;

http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index a9831bd..1ca5a05 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -4,16 +4,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
 
 /**
  * Created by peihe on 06/07/2017.
@@ -57,8 +62,16 @@ public class Graph {
     return edges.get(HeadTail.of(head, tail));
   }
 
-  public Set<Vertex> getLeafVertices() {
-    return leafVertices;
+  public Iterable<Vertex> getAllVertices() {
+    return vertices.values();
+  }
+
+  public Iterable<Edge> getAllEdges() {
+    return edges.values();
+  }
+
+  public Iterable<Vertex> getLeafVertices() {
+    return ImmutableList.copyOf(leafVertices);
   }
 
   public void accept(GraphVisitor visitor) {
@@ -122,6 +135,29 @@ public class Graph {
         throw new RuntimeException("Unexpected transform type: " + transform.getClass());
       }
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (obj instanceof Vertex) {
+        Vertex other = (Vertex) obj;
+        return transform.equals(other.transform);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(this.getClass(), transform);
+    }
+
+    @Override
+    public String toString() {
+      return new ReflectionToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+          .setExcludeFieldNames(new String[] { "outgoing", "incoming" }).toString();
+    }
   }
 
   public static class Edge {
@@ -156,6 +192,28 @@ public class Graph {
     public void addPath(NodePath path) {
       paths.add(checkNotNull(path, "path"));
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (obj instanceof Edge) {
+        Edge other = (Edge) obj;
+        return headTail.equals(other.headTail) && paths.equals(paths);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(headTail, paths);
+    }
+
+    @Override
+    public String toString() {
+      return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+    }
   }
 
   public static class NodePath {
@@ -176,6 +234,33 @@ public class Graph {
     public void addLast(PTransform<?, ?> transform) {
       path.addLast(transform);
     }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (obj instanceof NodePath) {
+        NodePath other = (NodePath) obj;
+        return path.equals(other.path);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(this.getClass(), path.hashCode());
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      for (PTransform<?, ?> collect : path) {
+        sb.append(collect.getName() + "|");
+      }
+      // sb.deleteCharAt(sb.length() - 1);
+      return sb.toString();
+    }
   }
 
   @AutoValue

http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
index 306e58e..359a6e2 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -24,7 +24,7 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    Graph.Vertex v = new Graph.Vertex(node.getTransform());
+    Graph.Vertex v = graph.addVertex(node.getTransform());
 
     for (PValue input : node.getInputs().values()) {
       if (outputToProducer.containsKey(input)) {
@@ -37,4 +37,8 @@ public class GraphConverter extends Pipeline.PipelineVisitor.Defaults {
       outputToProducer.put(output, v);
     }
   }
+
+  public Graph getGraph() {
+    return graph;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index d4fa2d9..793efd7 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -78,6 +78,7 @@ public class GraphPlanner {
       workingPath.addFirst(groupByKey.getTransform());
       Graph.Edge edge = fusedGraph.addEdge(v, workingVertex);
       edge.addPath(workingPath);
+      processParent(groupByKey.getIncoming().iterator().next().getHead());
     }
 
     public Graph getFusedGraph() {

http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
new file mode 100644
index 0000000..51c26f2
--- /dev/null
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
@@ -0,0 +1,108 @@
+package org.apache.beam.runners.mapreduce;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test that runs WordCount.
+ */
+@RunWith(JUnit4.class)
+public class WordCountTest {
+
+  public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
+
+  /**
+   * Concept #2: You can make your pipeline assembly code less verbose by defining your DoFns
+   * statically out-of-line. This DoFn tokenizes lines of text into individual words; we
pass it
+   * to a ParDo in the pipeline.
+   */
+  static class ExtractWordsFn extends DoFn<String, String> {
+    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      if (c.element().trim().isEmpty()) {
+        emptyLines.inc();
+      }
+
+      // Split the line into words.
+      String[] words = c.element().split(TOKENIZER_PATTERN);
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  /** A SimpleFunction that converts a Word and Count into a printable string. */
+  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String>
{
+    @Override
+    public String apply(KV<String, Long> input) {
+      return input.getKey() + ": " + input.getValue();
+    }
+  }
+
+  /**
+   * A PTransform that converts a PCollection containing lines of text into a PCollection
of
+   * formatted word counts.
+   *
+   * <p>Concept #3: This is a custom composite transform that bundles two transforms
(ParDo and
+   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy
reuse,
+   * modular testing, and an improved monitoring experience.
+   */
+  public static class CountWords extends PTransform<PCollection<String>,
+      PCollection<KV<String, Long>>> {
+    @Override
+    public PCollection<KV<String, Long>> expand(PCollection<String> lines)
{
+
+      // Convert lines of text into individual words.
+      PCollection<String> words = lines.apply(
+          ParDo.of(new ExtractWordsFn()));
+
+      // Count the number of times each word occurs.
+      PCollection<KV<String, Long>> wordCounts =
+          words.apply(Count.<String>perElement());
+
+      return wordCounts;
+    }
+  }
+
+  @Test
+  public void testWordCount() {
+    String input = "gs://apache-beam-samples/shakespeare/kinglear.txt";
+    String output =  "./output";
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(MapReduceRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes
the
+    // static FormatAsTextFn() to the ParDo transform.
+    p.apply("ReadLines", TextIO.read().from(input))
+        .apply(new CountWords())
+        .apply(MapElements.via(new FormatAsTextFn()))
+        .apply("WriteCounts", TextIO.write().to(output));
+
+    p.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
new file mode 100644
index 0000000..4f0c283
--- /dev/null
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphConverterTest.java
@@ -0,0 +1,39 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GraphConverter}.
+ */
+@RunWith(JUnit4.class)
+public class GraphConverterTest {
+
+  @Test
+  public void testCombine() throws Exception {
+    Pipeline p = Pipeline.create();
+    PCollection<KV<String, Integer>> input = p
+        .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Sum.<String>integersPerKey());
+    GraphConverter graphConverter = new GraphConverter();
+    p.traverseTopologically(graphConverter);
+
+    Graph graph = graphConverter.getGraph();
+
+    assertEquals(3, Iterables.size(graph.getAllVertices()));
+    assertEquals(2, Iterables.size(graph.getAllEdges()));
+    assertEquals(1, Iterables.size(graph.getLeafVertices()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
new file mode 100644
index 0000000..c98f817
--- /dev/null
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/translation/GraphPlannerTest.java
@@ -0,0 +1,42 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GraphPlanner}.
+ */
+@RunWith(JUnit4.class)
+public class GraphPlannerTest {
+
+  @Test
+  public void testCombine() throws Exception {
+    Pipeline p = Pipeline.create();
+    PCollection<KV<String, Integer>> input = p
+        .apply(Create.empty(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Sum.<String>integersPerKey());
+    GraphConverter graphConverter = new GraphConverter();
+    p.traverseTopologically(graphConverter);
+
+    Graph graph = graphConverter.getGraph();
+
+    GraphPlanner planner = new GraphPlanner();
+    Graph fusedGraph = planner.plan(graph);
+
+    assertEquals(3, Iterables.size(fusedGraph.getAllVertices()));
+    assertEquals(2, Iterables.size(fusedGraph.getAllEdges()));
+    assertEquals(1, Iterables.size(fusedGraph.getLeafVertices()));
+  }
+}


Mime
View raw message