beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/4] incubator-beam git commit: Hardcode MinimalWordCount to the DirectRunner
Date Tue, 08 Nov 2016 21:58:58 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master e35f571b0 -> 084a5e8ae


Hardcode MinimalWordCount to the DirectRunner

This makes it easy to immediately run, and removes various
non-portable instructions and others that aren't the easiest
for a "Getting Started" scenario.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c64cf367
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c64cf367
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c64cf367

Branch: refs/heads/master
Commit: c64cf367299b6fdbe25c62eec9840b02fbc9d518
Parents: e35f571
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Nov 3 14:18:43 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Nov 8 13:51:23 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/examples/MinimalWordCount.java  | 50 +++++++++-----------
 1 file changed, 22 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c64cf367/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 6fc873e..6085539 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -37,46 +37,33 @@ import org.apache.beam.sdk.values.KV;
  * argument processing, and focus on construction of the pipeline, which chains together
the
  * application of core transforms.
  *
- * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount},
and finally
- * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
+ * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount},
and finally the
+ * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional
  * concepts.
  *
  * <p>Concepts:
+ *
  * <pre>
  *   1. Reading data from text files
  *   2. Specifying 'inline' transforms
- *   3. Counting a PCollection
- *   4. Writing data to Cloud Storage as text files
+ *   3. Counting items in a PCollection
+ *   4. Writing data to text files
  * </pre>
  *
- * <p>To execute this pipeline, first edit the code to set your project ID, the temp
- * location, and the output location. The specified GCS bucket(s) must already exist.
- *
- * <p>Then, run the pipeline as described in the README. It will be deployed and run
with the
- * selected runner. No args are required to run the pipeline. You can see the results in
your
- * output bucket in the GCS browser.
+ * <p>No arguments are required to run this pipeline. It will be executed with the
DirectRunner. You
+ * can see the results in the output files in your current working directory, with names
like
+ * "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate
+ * file service.
  */
 public class MinimalWordCount {
 
   public static void main(String[] args) {
     // Create a PipelineOptions object. This object lets us set various execution
-    // options for our pipeline, such as the associated Cloud Platform project and the location
-    // in Google Cloud Storage to stage files.
+    // options for our pipeline, such as the runner you wish to use. This example
+    // will run with the DirectRunner by default, based on the class path configured
+    // in its dependencies.
     PipelineOptions options = PipelineOptionsFactory.create();
 
-    // In order to run your pipeline, you need to make following runner specific changes:
-    //
-    // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner.
-    // CHANGE 2/3: Specify runner-required options.
-    // For DataflowRunner, set project and temp location as follows:
-    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-    //   dataflowOptions.setRunner(DataflowRunner.class);
-    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
-    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
-    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
-    // for more details.
-    //   options.setRunner(FlinkRunner.class);
-
     // Create the Pipeline object with the options we defined above.
     Pipeline p = Pipeline.create(options);
 
@@ -85,7 +72,10 @@ public class MinimalWordCount {
     // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read
a set
     // of input text files. TextIO.Read returns a PCollection where each element is one line
from
     // the input text (a set of Shakespeare's texts).
+
+    // This example reads a public data set consisting of the complete works of Shakespeare.
     p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+
      // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo
invokes a
      // DoFn (defined in-line) on each element that tokenizes the text line into individual
words.
      // The ParDo returns a PCollection<String>, where each element is an individual
word in
@@ -100,10 +90,12 @@ public class MinimalWordCount {
                          }
                        }
                      }))
+
      // Concept #3: Apply the Count transform to our PCollection of individual words. The
Count
      // transform returns a new PCollection of key/value pairs, where each key represents
a unique
      // word in the text. The associated value is the occurrence count for that word.
      .apply(Count.<String>perElement())
+
      // Apply a MapElements transform that formats our PCollection of word counts into a
printable
      // string, suitable for writing to an output file.
      .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>,
String>() {
@@ -112,11 +104,13 @@ public class MinimalWordCount {
                          return input.getKey() + ": " + input.getValue();
                        }
                      }))
+
      // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
      // TextIO.Write writes the contents of a PCollection (in this case, our PCollection
of
-     // formatted strings) to a series of text files in Google Cloud Storage.
-     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results
to.
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+     // formatted strings) to a series of text files.
+     //
+     // By default, it will write to a set of files with names like wordcount-00001-of-00005
+     .apply(TextIO.Write.to("wordcounts"));
 
     // Run the pipeline.
     p.run().waitUntilFinish();


Mime
View raw message