beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/5] beam-site git commit: WIP: Add Python to WordCount documentation
Date Tue, 31 Jan 2017 01:37:21 GMT
Repository: beam-site
Updated Branches:
  refs/heads/asf-site 2f4d86d31 -> b81afa390


WIP: Add Python to WordCount documentation


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/fb8666ef
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/fb8666ef
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/fb8666ef

Branch: refs/heads/asf-site
Commit: fb8666ef0b6c262a3705329fcd026d32c1a931a7
Parents: 2f4d86d
Author: Hadar Hod <hadarh@google.com>
Authored: Tue Jan 24 19:37:16 2017 -0800
Committer: Davor Bonaci <davor@google.com>
Committed: Mon Jan 30 17:35:50 2017 -0800

----------------------------------------------------------------------
 src/get-started/wordcount-example.md | 222 +++++++++++++++++++++++++-----
 1 file changed, 184 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/fb8666ef/src/get-started/wordcount-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/wordcount-example.md b/src/get-started/wordcount-example.md
index b8776d6..a02b327 100644
--- a/src/get-started/wordcount-example.md
+++ b/src/get-started/wordcount-example.md
@@ -10,6 +10,14 @@ redirect_from: /use/wordcount-example/
 * TOC
 {:toc}
 
+<nav class="language-switcher">
+  <strong>Adapt for:</strong> 
+  <ul>
+    <li data-type="language-java">Java SDK</li>
+    <li data-type="language-py">Python SDK</li>
+  </ul>
+</nav>
+
 The WordCount examples demonstrate how to set up a processing pipeline that can read text,
tokenize the text lines into individual words, and perform a frequency count on each of those
words. The Beam SDKs contain a series of these four successively more detailed WordCount examples
that build on each other. The input text for all the examples is a set of Shakespeare's texts.
 
 Each WordCount example introduces different concepts in the Beam programming model. Begin
by understanding Minimal WordCount, the simplest of the examples. Once you feel comfortable
with the basic principles in building a pipeline, continue on to learn more concepts in the
other examples.
@@ -60,12 +68,26 @@ You can specify a runner for executing your pipeline, such as the `DataflowRunne
     //   options.setRunner(FlinkRunner.class);
 ```
 
+```py
+options = PipelineOptions()
+google_cloud_options = options.view_as(GoogleCloudOptions)
+google_cloud_options.project = 'my-project-id'
+google_cloud_options.job_name = 'myjob'
+google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
+google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
+options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner'
+```
+
 The next step is to create a Pipeline object with the options we've just constructed. The
Pipeline object builds up the graph of transformations to be executed, associated with that
particular pipeline.
 
 ```java
 Pipeline p = Pipeline.create(options);
 ```
 
+```py
+p = beam.Pipeline(options=options)
+```
+
 ### Applying Pipeline Transforms
 
 The Minimal WordCount pipeline contains several transforms to read data into the pipeline,
manipulate or otherwise transform the data, and write out the results. Each transform represents
an operation in the pipeline.
@@ -79,51 +101,72 @@ The Minimal WordCount pipeline contains five transforms:
 
 1.  A text file `Read` transform is applied to the Pipeline object itself, and produces a
`PCollection` as output. Each element in the output PCollection represents one line of text
from the input file. This example happens to use input data stored in a publicly accessible
Google Cloud Storage bucket ("gs://").
 
-	```java
-	        p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
-	```
+    ```java
+    p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+    ```
+
+    ```py
+    p | beam.io.Read(beam.io.TextFileSource('gs://dataflow-samples/shakespeare/kinglear.txt'))
+    ```
 
 2.  A [ParDo]({{ site.baseurl }}/documentation/programming-guide/#transforms-pardo) transform
that invokes a `DoFn` (defined in-line as an anonymous class) on each element that tokenizes
the text lines into individual words. The input for this transform is the `PCollection` of
text lines generated by the previous `TextIO.Read` transform. The `ParDo` transform outputs
a new `PCollection`, where each element represents an individual word in the text.
 
-	```java
-	        .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
-	            @ProcessElement
-	            public void processElement(ProcessContext c) {
-	                for (String word : c.element().split("[^a-zA-Z']+")) {
-	                    if (!word.isEmpty()) {
-	                        c.output(word);
-	                    }
-	                }
-	            }
-	        }))
-	```
+    ```java
+    .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            for (String word : c.element().split("[^a-zA-Z']+")) {
+                if (!word.isEmpty()) {
+                    c.output(word);
+                }
+            }
+        }
+    }))
+    ```
+
+    ```py
+    # The Flatmap transform is a simplified version of ParDo.
+    | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+    ```
 
 3.  The SDK-provided `Count` transform is a generic transform that takes a `PCollection`
of any type, and returns a `PCollection` of key/value pairs. Each key represents a unique
element from the input collection, and each value represents the number of times that key
appeared in the input collection.
 
 	In this pipeline, the input for `Count` is the `PCollection` of individual words generated
by the previous `ParDo`, and the output is a `PCollection` of key/value pairs where each key
represents a unique word in the text and the associated value is the occurrence count for
each.
 
-	```java
-	        .apply(Count.<String>perElement())
-	```
+    ```java
+    .apply(Count.<String>perElement())
+    ```
+
+    ```py
+    | beam.combiners.Count.PerElement()
+    ```
 
 4.  The next transform formats each of the key/value pairs of unique words and occurrence
counts into a printable string suitable for writing to an output file.
 
 	`MapElements` is a higher-level composite transform that encapsulates a simple `ParDo`;
for each element in the input `PCollection`, `MapElements` applies a function that produces
exactly one output element. In this example, `MapElements` invokes a `SimpleFunction` (defined
in-line as an anonymous class) that does the formatting. As input, `MapElements` takes a `PCollection`
of key/value pairs generated by `Count`, and produces a new `PCollection` of printable strings.
 
-	```java
-	        .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>,
String>() {
-	            @Override
-	            public String apply(KV<String, Long> input) {
-	                return input.getKey() + ": " + input.getValue();
-	            }
-	        }))
-	```
+    ```java
+    .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>,
String>() {
+        @Override
+        public String apply(KV<String, Long> input) {
+            return input.getKey() + ": " + input.getValue();
+        }
+    }))
+    ```
+
+    ```py
+    | beam.Map(lambda (word, count): '%s: %s' % (word, count))
+    ```
 
 5.  A text file `Write`. This transform takes the final `PCollection` of formatted Strings
as input and writes each element to an output text file. Each element in the input `PCollection`
represents one line of text in the resulting output file.
 
-	```java
-	        .apply(TextIO.Write.to("wordcounts"));
-	```
+    ```java
+    .apply(TextIO.Write.to("wordcounts"));
+    ```
+
+    ```py
+    | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+    ```
 
 Note that the `Write` transform produces a trivial result value of type `PDone`, which in
this case is ignored.
 
@@ -135,6 +178,10 @@ Run the pipeline by calling the `run` method, which sends your pipeline
to be ex
 p.run().waitUntilFinish();
 ```
 
+```py
+p.run()
+```
+
 Note that the `run` method is asynchronous. For a blocking execution instead, run your pipeline
appending the `waitUntilFinish` method.
 
 ## WordCount Example
@@ -157,9 +204,9 @@ The following sections explain these key concepts in detail, and break
down the
 
 When using `ParDo` transforms, you need to specify the processing operation that gets applied
to each element in the input `PCollection`. This processing operation is a subclass of the
SDK class `DoFn`. You can create the `DoFn` subclasses for each `ParDo` inline, as an anonymous
inner class instance, as is done in the previous example (Minimal WordCount). However, it's
often a good idea to define the `DoFn` at the global level, which makes it easier to unit
test and can make the `ParDo` code more readable.
 
-In this example, `ExtractWordsFn` is a `DoFn` that is defined as a static class:
-
 ```java
+// In this example, ExtractWordsFn is a DoFn that is defined as a static class:
+
 static class ExtractWordsFn extends DoFn<String, String> {
     ...
 
@@ -170,10 +217,16 @@ static class ExtractWordsFn extends DoFn<String, String> {
 }
 ```
 
-This `DoFn` (`ExtractWordsFn`) is the processing operation that `ParDo` applies to the `PCollection`
of words:
+```py
+# In this example, the DoFns are defined as classes:
 
-```java
-PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
+class FormatAsTextFn(beam.DoFn):
+
+  def process(self, context):
+    word, count = context.element
+    yield '%s: %s' % (word, count)
+
+formatted = counts | beam.ParDo(FormatAsTextFn())
 ```
 
 ### Creating Composite Transforms
@@ -211,13 +264,28 @@ public static void main(String[] args) throws IOException {
 }
 ```
 
+```py
+class CountWords(beam.PTransform):
+
+  def apply(self, pcoll):
+    return (pcoll
+            # Convert lines of text into individual words.
+            | beam.FlatMap(
+                'ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x))
+
+            # Count the number of times each word occurs.
+            | beam.combiners.Count.PerElement())
+
+counts = lines | CountWords()
+```
+
 ### Using Parameterizable PipelineOptions
 
 You can hard-code various execution options when you run your pipeline. However, the more
common way is to define your own configuration options via command-line argument parsing.
Defining your configuration options via the command-line makes the code more easily portable
across different runners.
 
 Add arguments to be processed by the command-line parser, and specify default values for
them. You can then access the options values in your pipeline code. 
 
-``` java
+```java
 public static interface WordCountOptions extends PipelineOptions {
   @Description("Path of the file to read from")
   @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
@@ -234,6 +302,19 @@ public static void main(String[] args) {
 }
 ```
 
+```py
+class WordCountOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--input',
+                        help='Input for the dataflow pipeline',
+                        default='gs://my-bucket/input')
+
+options = PipelineOptions(argv)
+p = beam.Pipeline(options=options)
+```
+
 ## Debugging WordCount Example
 
 The Debugging WordCount example demonstrates some best practices for instrumenting your pipeline
code.
@@ -259,9 +340,9 @@ If you execute your pipeline using `DirectRunner`, it will print the log
message
 
 If you execute your pipeline using `DataflowRunner`, you can use Google Cloud Logging. Google
Cloud Logging (currently in beta) aggregates the logs from all of your Dataflow job's workers
to a single location in the Google Cloud Platform Console. You can use Cloud Logging to search
and access the logs from all of the Compute Engine instances that Dataflow has spun up to
complete your Dataflow job. You can add logging statements into your pipeline's `DoFn` instances
that will appear in Cloud Logging as your pipeline runs.
 
-In this example, we use `.trace` and `.debug`:
-
 ```java
+// This example uses .trace and .debug:
+
 public class DebuggingWordCount {
 
   public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String,
Long>> {
@@ -280,6 +361,43 @@ public class DebuggingWordCount {
 }
 ```
 
+```py
+import logging
+
+class FilterTextFn(beam.DoFn):
+  """A DoFn that filters for a specific key based on a regular expression."""
+
+  # A custom aggregator can track values in your pipeline as it runs. Create
+  # custom aggregators matched_word and unmatched_words.
+  matched_words = beam.Aggregator('matched_words')
+  umatched_words = beam.Aggregator('umatched_words')
+
+  def __init__(self, pattern):
+    self.pattern = pattern
+
+  def process(self, context):
+    word, _ = context.element
+    if re.match(self.pattern, word):
+      # Log at INFO level each element we match. When executing this pipeline
+      # using the Dataflow service, these log lines will appear in the Cloud
+      # Logging UI.
+      logging.info('Matched %s', word)
+
+      # Add 1 to the custom aggregator matched_words
+      context.aggregate_to(self.matched_words, 1)
+      yield context.element
+    else:
+      # Log at the "DEBUG" level each element that is not matched. Different
+      # log levels can be used to control the verbosity of logging providing
+      # an effective mechanism to filter less important information. Note
+      # currently only "INFO" and higher level logs are emitted to the Cloud
+      # Logger. This log message will not be visible in the Cloud Logger.
+      logging.debug('Did not match %s', word)
+
+      # Add 1 to the custom aggregator umatched_words
+      context.aggregate_to(self.umatched_words, 1)
+```
+
 If you execute your pipeline using `DataflowRunner`, you can control the worker log levels.
Dataflow workers that execute user code are configured to log to Cloud Logging by default
at "INFO" log level and higher. You can override log levels for specific logging namespaces
by specifying: `--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}`. For example,
by specifying `--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}` when executing
this pipeline using the Dataflow service, Cloud Logging would contain only "DEBUG" or higher
level logs for the package in addition to the default "INFO" or higher level logs. 
 
 The default Dataflow worker logging configuration can be overridden by specifying `--defaultWorkerLogLevel=<one
of TRACE, DEBUG, INFO, WARN, ERROR>`. For example, by specifying `--defaultWorkerLogLevel=DEBUG`
when executing this pipeline with the Dataflow service, Cloud Logging would contain all "DEBUG"
or higher level logs. Note that changing the default worker log level to TRACE or DEBUG will
significantly increase the amount of logs output.
@@ -309,6 +427,10 @@ public static void main(String[] args) {
 }
 ```
 
+```py
+This feature is not yet available in the Beam SDK for Python.
+```
+
 ## WindowedWordCount
 
 This example, `WindowedWordCount`, counts words in text just as the previous examples did,
but introduces several advanced concepts.
@@ -338,6 +460,10 @@ public static void main(String[] args) throws IOException {
 
 ```
 
+```py
+This feature is not yet available in the Beam SDK for Python.
+```
+
 ### Adding Timestamps to Data
 
 Each element in a `PCollection` has an associated **timestamp**. The timestamp for each element
is initially assigned by the source that creates the `PCollection` and can be adjusted by
a `DoFn`. In this example the input is bounded. For the purpose of the example, the `DoFn`
method named `AddTimestampsFn` (invoked by `ParDo`) will set a timestamp for each element
in the `PCollection`.
@@ -346,6 +472,10 @@ Each element in a `PCollection` has an associated **timestamp**. The
timestamp f
 .apply(ParDo.of(new AddTimestampFn()));
 ```
 
+```py
+This feature is not yet available in the Beam SDK for Python.
+```
+
 Below is the code for `AddTimestampFn`, a `DoFn` invoked by `ParDo`, that sets the data element
of the timestamp given the element itself. For example, if the elements were log lines, this
`ParDo` could parse the time out of the log string and set it as the element's timestamp.
There are no timestamps inherent in the works of Shakespeare, so in this case we've made up
random timestamps just to illustrate the concept. Each line of the input text will get a random
associated timestamp sometime in a 2-hour period.
 
 ```java
@@ -369,6 +499,10 @@ static class AddTimestampFn extends DoFn<String, String> {
 }
 ```
 
+```py
+This feature is not yet available in the Beam SDK for Python.
+```
+
 ### Windowing
 
 Beam uses a concept called **Windowing** to subdivide a `PCollection` according to the timestamps
of its individual elements. `PTransforms` that aggregate multiple elements, process each `PCollection`
as a succession of multiple, finite windows, even though the entire collection itself may
be of infinite size (unbounded).
@@ -381,14 +515,22 @@ PCollection<String> windowedWords = input
     FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
 ```
 
+```py
+This feature is not yet available in the Beam SDK for Python.
+```
+
 ### Reusing PTransforms over windowed PCollections
 
 You can reuse existing `PTransform`s, that were created for manipulating simple `PCollection`s,
over windowed `PCollection`s as well.
 
-```
+```java
 PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords());
 ```
 
+```py
+This feature is not yet available in the Beam SDK for Python.
+```
+
 ### Write Results to an Unbounded Sink
 
 Since our input is unbounded, the same is true of our output `PCollection`. We need to make
sure that we choose an appropriate, unbounded sink. Some output sinks support only bounded
output, such as a text file. Google Cloud BigQuery is an output source that supports both
bounded and unbounded input.
@@ -404,3 +546,7 @@ wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
       .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
 ```
 
+```py
+This feature is not yet available in the Beam SDK for Python.
+```
+


Mime
View raw message