beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [06/36] beam git commit: mr-runner: add JobPrototype and translate it to a MR job.
Date Thu, 07 Sep 2017 18:39:15 GMT
mr-runner: add JobPrototype and translate it to a MR job.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0cbdc5b7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0cbdc5b7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0cbdc5b7

Branch: refs/heads/mr-runner
Commit: 0cbdc5b75ed5581ffef8d129b4e61e339d459697
Parents: a884a2f
Author: Pei He <pei@apache.org>
Authored: Mon Jul 24 20:15:37 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Aug 31 14:13:47 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/MapReducePipelineOptions.java     |  5 ++
 .../beam/runners/mapreduce/MapReduceRunner.java | 41 ++++++++-
 .../runners/mapreduce/MapReduceWordCount.java   |  2 +-
 .../mapreduce/translation/BeamInputFormat.java  | 44 ++++-----
 .../mapreduce/translation/BeamMapper.java       | 75 +++++++++++++---
 .../runners/mapreduce/translation/Graph.java    |  5 ++
 .../mapreduce/translation/JobPrototype.java     | 95 ++++++++++++++++++++
 .../beam/runners/mapreduce/WordCountTest.java   | 42 +++------
 8 files changed, 244 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index da29931..ce8f937 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -1,9 +1,14 @@
 package org.apache.beam.runners.mapreduce;
 
+import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * {@link PipelineOptions} for {@link MapReduceRunner}.
  */
 public interface MapReducePipelineOptions extends PipelineOptions {
+
+  @Description("The jar class of the user Beam program.")
+  Class<?> getJarClass();
+  void setJarClass(Class<?> jarClass);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 247a8e5..0e3142c 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -1,9 +1,21 @@
 package org.apache.beam.runners.mapreduce;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.IOException;
+import org.apache.beam.runners.mapreduce.translation.Graph;
+import org.apache.beam.runners.mapreduce.translation.GraphConverter;
+import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
+import org.apache.beam.runners.mapreduce.translation.JobPrototype;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
  * {@link PipelineRunner} for crunch.
@@ -17,11 +29,38 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult>
{
    * @return The newly created runner.
    */
   public static MapReduceRunner fromOptions(PipelineOptions options) {
-    return new MapReduceRunner();
+    return new MapReduceRunner(options.as(MapReducePipelineOptions.class));
+  }
+
+  private final MapReducePipelineOptions options;
+
+  MapReduceRunner(MapReducePipelineOptions options) {
+    this.options = checkNotNull(options, "options");
   }
 
   @Override
   public PipelineResult run(Pipeline pipeline) {
+    GraphConverter graphConverter = new GraphConverter();
+    pipeline.traverseTopologically(graphConverter);
+
+    Graph graph = graphConverter.getGraph();
+
+    GraphPlanner planner = new GraphPlanner();
+    Graph fusedGraph = planner.plan(graph);
+    for (Graph.Vertex vertex : fusedGraph.getAllVertices()) {
+      if (vertex.getTransform() instanceof GroupByKey
+          || vertex.getTransform() instanceof Read.Bounded) {
+        continue;
+      } else {
+        JobPrototype jobPrototype = JobPrototype.create(1, vertex);
+        try {
+          Job job = jobPrototype.build(options.getJarClass(), new Configuration());
+          job.waitForCompletion(true);
+        } catch (Exception e) {
+          Throwables.throwIfUnchecked(e);
+        }
+      }
+    }
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
index 4ba3a29..d0c7b78 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
@@ -199,7 +199,7 @@ public class MapReduceWordCount {
         KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
 
     conf.set(
-        "source",
+        BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
         Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source)));
 
     Job job = Job.getInstance(conf, "word count");

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 8c4155a..0cfb14b 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -3,17 +3,19 @@ package org.apache.beam.runners.mapreduce.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Function;
+import com.google.common.base.Strings;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -25,31 +27,30 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 /**
  * Adaptor from Beam {@link BoundedSource} to MapReduce {@link InputFormat}.
  */
-public class BeamInputFormat<K, V> extends InputFormat {
+public class BeamInputFormat<T> extends InputFormat {
 
+  public static final String BEAM_SERIALIZED_BOUNDED_SOURCE = "beam-serialized-bounded-source";
   private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 * 1000;
 
-  private BoundedSource<KV<K, V>> source;
+  private BoundedSource<T> source;
   private PipelineOptions options;
 
   public BeamInputFormat() {
   }
 
-  public BeamInputFormat(BoundedSource<KV<K, V>> source, PipelineOptions options)
{
-    this.source = checkNotNull(source, "source");
-    this.options = checkNotNull(options, "options");
-  }
-
   @Override
   public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
-    source = (BoundedSource<KV<K,V>>) SerializableUtils.deserializeFromByteArray(
-        Base64.decodeBase64(context.getConfiguration().get("source")),
-        "");
+    String serializedBoundedSource = context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE);
+    if (Strings.isNullOrEmpty(serializedBoundedSource)) {
+      return ImmutableList.of();
+    }
+    source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(serializedBoundedSource), "BoundedSource");
     try {
       return FluentIterable.from(source.split(DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, options))
-          .transform(new Function<BoundedSource<KV<K, V>>, InputSplit>()
{
+          .transform(new Function<BoundedSource<T>, InputSplit>() {
             @Override
-            public InputSplit apply(BoundedSource<KV<K, V>> source) {
+            public InputSplit apply(BoundedSource<T> source) {
               try {
                 return new BeamInputSplit(source.getEstimatedSizeBytes(options));
               } catch (Exception e) {
@@ -65,8 +66,8 @@ public class BeamInputFormat<K, V> extends InputFormat {
   @Override
   public RecordReader createRecordReader(
       InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
{
-    source = (BoundedSource<KV<K,V>>) SerializableUtils.deserializeFromByteArray(
-        Base64.decodeBase64(context.getConfiguration().get("source")),
+    source = (BoundedSource<T>) SerializableUtils.deserializeFromByteArray(
+        Base64.decodeBase64(context.getConfiguration().get(BEAM_SERIALIZED_BOUNDED_SOURCE)),
         "");
     return new BeamRecordReader<>(source.createReader(options));
   }
@@ -102,12 +103,12 @@ public class BeamInputFormat<K, V> extends InputFormat {
     }
   }
 
-  private class BeamRecordReader<K, V> extends RecordReader {
+  private class BeamRecordReader<T> extends RecordReader {
 
-    private final BoundedSource.BoundedReader<KV<K, V>> reader;
+    private final BoundedSource.BoundedReader<T> reader;
     private boolean started;
 
-    public BeamRecordReader(BoundedSource.BoundedReader<KV<K, V>> reader) {
+    public BeamRecordReader(BoundedSource.BoundedReader<T> reader) {
       this.reader = checkNotNull(reader, "reader");
       this.started = false;
     }
@@ -128,12 +129,13 @@ public class BeamInputFormat<K, V> extends InputFormat {
 
     @Override
     public Object getCurrentKey() throws IOException, InterruptedException {
-      return reader.getCurrent().getKey();
+      return "global";
     }
 
     @Override
     public Object getCurrentValue() throws IOException, InterruptedException {
-      return reader.getCurrent().getValue();
+      return WindowedValue.timestampedValueInGlobalWindow(
+          reader.getCurrent(), reader.getCurrentTimestamp());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index 88fc8d6..9d2f80d 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -1,30 +1,83 @@
 package org.apache.beam.runners.mapreduce.translation;
 
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.values.KV;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Created by peihe on 21/07/2017.
  */
-public class BeamMapper<KeyInT, ValueInT, KeyOutT, ValueOutT>
-    extends Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT> {
+public class BeamMapper<ValueInT, ValueOutT>
+    extends Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>
{
+
+  public static final String BEAM_SERIALIZED_DO_FN = "beam-serialized-do-fn";
+  private static final Logger LOG = LoggerFactory.getLogger(BeamMapper.class);
 
-  private DoFnInvoker<KV<KeyInT, ValueInT>, KV<KeyOutT, ValueOutT>> doFnInvoker;
+  private DoFnRunner<ValueInT, ValueOutT> doFnRunner;
+  private PipelineOptions options;
 
   @Override
-  protected void setup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context)
{
+  protected void setup(
+      Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context
context) {
+    String serializedDoFn = checkNotNull(
+        context.getConfiguration().get(BEAM_SERIALIZED_DO_FN),
+        BEAM_SERIALIZED_DO_FN);
+    doFnRunner = DoFnRunners.simpleRunner(
+        options,
+        (DoFn<ValueInT, ValueOutT>) SerializableUtils
+            .deserializeFromByteArray(
+                Base64.decodeBase64(serializedDoFn), "DoFn"),
+        NullSideInputReader.empty(),
+        new MROutputManager(context),
+        null,
+        ImmutableList.<TupleTag<?>>of(),
+        null,
+        WindowingStrategy.globalDefault());
   }
 
   @Override
   protected void map(
-      KeyInT key,
-      ValueInT value,
-      Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context) {
-    System.out.print(String.format("key: %s, value: %s", key, value));
+      Object key,
+      WindowedValue<ValueInT> value,
+      Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context
context) {
+    LOG.info("key: {}, value: {}.", key, value);
+    doFnRunner.processElement(value);
   }
 
   @Override
-  protected void cleanup(Mapper<KeyInT, ValueInT, KeyOutT, ValueOutT>.Context context)
{
+  protected void cleanup(
+      Mapper<Object, WindowedValue<ValueInT>, Object, WindowedValue<ValueOutT>>.Context
context) {
+  }
+
+  class MROutputManager implements DoFnRunners.OutputManager {
+
+    private final Mapper<Object, Object, Object, Object>.Context context;
+
+    MROutputManager(Mapper<?, ?, ?, ?>.Context context) {
+      this.context = (Mapper<Object, Object, Object, Object>.Context) context;
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      try {
+        context.write("global", output);
+      } catch (Exception e) {
+        Throwables.throwIfUnchecked(e);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index 1ca5a05..da31f89 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -235,6 +236,10 @@ public class Graph {
       path.addLast(transform);
     }
 
+    public Iterable<PTransform<?, ?>> transforms() {
+      return path;
+    }
+
     @Override
     public boolean equals(Object obj) {
       if (obj == this) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
new file mode 100644
index 0000000..bdbbe5d
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -0,0 +1,95 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+
+/**
+ * Created by peihe on 24/07/2017.
+ */
+public class JobPrototype {
+
+  public static JobPrototype create(int stageId, Graph.Vertex vertex) {
+    return new JobPrototype(stageId, vertex);
+  }
+
+  private final int stageId;
+  private final Graph.Vertex vertex;
+  private final Set<JobPrototype> dependencies;
+
+  private JobPrototype(int stageId, Graph.Vertex vertex) {
+    this.stageId = stageId;
+    this.vertex = checkNotNull(vertex, "vertex");
+    this.dependencies = Sets.newHashSet();
+  }
+
+  public Job build(Class<?> jarClass, Configuration conf) throws IOException {
+    Job job = new Job(conf);
+    conf = job.getConfiguration();
+    job.setJarByClass(jarClass);
+
+    // Setup BoundedSources in BeamInputFormat.
+    // TODO: support more than one inputs
+    Graph.Vertex head = Iterables.getOnlyElement(vertex.getIncoming()).getHead();
+    checkState(head.getTransform() instanceof Read.Bounded);
+    Read.Bounded read = (Read.Bounded) head.getTransform();
+    conf.set(
+        BeamInputFormat.BEAM_SERIALIZED_BOUNDED_SOURCE,
+        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(read.getSource())));
+    job.setInputFormatClass(BeamInputFormat.class);
+
+    // Setup DoFns in BeamMapper.
+    // TODO: support more than one out going edge.
+    Graph.Edge outEdge = Iterables.getOnlyElement(head.getOutgoing());
+    Graph.NodePath outPath = Iterables.getOnlyElement(outEdge.getPaths());
+    List<DoFn> doFns = new ArrayList<>();
+    doFns.addAll(FluentIterable.from(outPath.transforms())
+        .filter(new Predicate<PTransform<?, ?>>() {
+          @Override
+          public boolean apply(PTransform<?, ?> input) {
+            return !(input instanceof Read.Bounded);
+          }
+        })
+        .transform(new Function<PTransform<?, ?>, DoFn>() {
+          @Override
+          public DoFn apply(PTransform<?, ?> input) {
+            checkArgument(
+                input instanceof ParDo.SingleOutput, "Only support ParDo.SingleOutput.");
+            ParDo.SingleOutput parDo = (ParDo.SingleOutput) input;
+            return parDo.getFn();
+          }})
+        .toList());
+    if (vertex.getTransform() instanceof ParDo.SingleOutput) {
+      doFns.add(((ParDo.SingleOutput) vertex.getTransform()).getFn());
+    } else if (vertex.getTransform() instanceof ParDo.MultiOutput) {
+      doFns.add(((ParDo.MultiOutput) vertex.getTransform()).getFn());
+    }
+    conf.set(
+        BeamMapper.BEAM_SERIALIZED_DO_FN,
+        Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
+            Iterables.getOnlyElement(doFns))));
+    job.setMapperClass(BeamMapper.class);
+    job.setOutputFormatClass(NullOutputFormat.class);
+    return job;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
index 51c26f2..80df3e1 100644
--- a/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
+++ b/runners/map-reduce/src/test/java/org/apache/beam/runners/mapreduce/WordCountTest.java
@@ -17,6 +17,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.log4j.BasicConfigurator;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -63,46 +64,25 @@ public class WordCountTest {
     }
   }
 
-  /**
-   * A PTransform that converts a PCollection containing lines of text into a PCollection
of
-   * formatted word counts.
-   *
-   * <p>Concept #3: This is a custom composite transform that bundles two transforms
(ParDo and
-   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy
reuse,
-   * modular testing, and an improved monitoring experience.
-   */
-  public static class CountWords extends PTransform<PCollection<String>,
-      PCollection<KV<String, Long>>> {
-    @Override
-    public PCollection<KV<String, Long>> expand(PCollection<String> lines)
{
-
-      // Convert lines of text into individual words.
-      PCollection<String> words = lines.apply(
-          ParDo.of(new ExtractWordsFn()));
-
-      // Count the number of times each word occurs.
-      PCollection<KV<String, Long>> wordCounts =
-          words.apply(Count.<String>perElement());
-
-      return wordCounts;
-    }
-  }
-
   @Test
   public void testWordCount() {
-    String input = "gs://apache-beam-samples/shakespeare/kinglear.txt";
+    BasicConfigurator.configure();
+
+    String input = "/Users/peihe/github/beam/LICENSE";
     String output =  "./output";
-    PipelineOptions options = PipelineOptionsFactory.create();
+    MapReducePipelineOptions options = PipelineOptionsFactory.as(MapReducePipelineOptions.class);
+    options.setJarClass(this.getClass());
     options.setRunner(MapReduceRunner.class);
     Pipeline p = Pipeline.create(options);
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes
the
     // static FormatAsTextFn() to the ParDo transform.
     p.apply("ReadLines", TextIO.read().from(input))
-        .apply(new CountWords())
-        .apply(MapElements.via(new FormatAsTextFn()))
-        .apply("WriteCounts", TextIO.write().to(output));
+        .apply(ParDo.of(new ExtractWordsFn()));
+//        .apply(Count.<String>perElement())
+//        .apply(MapElements.via(new FormatAsTextFn()))
+//        .apply("WriteCounts", TextIO.write().to(output));
 
-    p.run().waitUntilFinish();
+    p.run();
   }
 }


Mime
View raw message