Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 90A18200C1B for ; Tue, 31 Jan 2017 02:37:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8F259160B66; Tue, 31 Jan 2017 01:37:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 63315160B4D for ; Tue, 31 Jan 2017 02:37:22 +0100 (CET) Received: (qmail 47525 invoked by uid 500); 31 Jan 2017 01:37:21 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 47514 invoked by uid 99); 31 Jan 2017 01:37:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jan 2017 01:37:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6D838DFC40; Tue, 31 Jan 2017 01:37:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.apache.org Date: Tue, 31 Jan 2017 01:37:21 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] beam-site git commit: WIP: Add Python to WordCount documentation archived-at: Tue, 31 Jan 2017 01:37:23 -0000 Repository: beam-site Updated Branches: refs/heads/asf-site 2f4d86d31 -> b81afa390 WIP: Add Python to WordCount documentation Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/fb8666ef Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/fb8666ef Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/fb8666ef Branch: refs/heads/asf-site Commit: fb8666ef0b6c262a3705329fcd026d32c1a931a7 Parents: 2f4d86d Author: Hadar Hod Authored: Tue Jan 24 19:37:16 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 17:35:50 2017 -0800 ---------------------------------------------------------------------- src/get-started/wordcount-example.md | 222 +++++++++++++++++++++++++----- 1 file changed, 184 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/fb8666ef/src/get-started/wordcount-example.md ---------------------------------------------------------------------- diff --git a/src/get-started/wordcount-example.md b/src/get-started/wordcount-example.md index b8776d6..a02b327 100644 --- a/src/get-started/wordcount-example.md +++ b/src/get-started/wordcount-example.md @@ -10,6 +10,14 @@ redirect_from: /use/wordcount-example/ * TOC {:toc} + + The WordCount examples demonstrate how to set up a processing pipeline that can read text, tokenize the text lines into individual words, and perform a frequency count on each of those words. The Beam SDKs contain a series of these four successively more detailed WordCount examples that build on each other. The input text for all the examples is a set of Shakespeare's texts. Each WordCount example introduces different concepts in the Beam programming model. Begin by understanding Minimal WordCount, the simplest of the examples. Once you feel comfortable with the basic principles in building a pipeline, continue on to learn more concepts in the other examples. @@ -60,12 +68,26 @@ You can specify a runner for executing your pipeline, such as the `DataflowRunne // options.setRunner(FlinkRunner.class); ``` +```py +options = PipelineOptions() +google_cloud_options = options.view_as(GoogleCloudOptions) +google_cloud_options.project = 'my-project-id' +google_cloud_options.job_name = 'myjob' +google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging' +google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp' +options.view_as(StandardOptions).runner = 'BlockingDataflowPipelineRunner' +``` + The next step is to create a Pipeline object with the options we've just constructed. The Pipeline object builds up the graph of transformations to be executed, associated with that particular pipeline. ```java Pipeline p = Pipeline.create(options); ``` +```py +p = beam.Pipeline(options=options) +``` + ### Applying Pipeline Transforms The Minimal WordCount pipeline contains several transforms to read data into the pipeline, manipulate or otherwise transform the data, and write out the results. Each transform represents an operation in the pipeline. @@ -79,51 +101,72 @@ The Minimal WordCount pipeline contains five transforms: 1. A text file `Read` transform is applied to the Pipeline object itself, and produces a `PCollection` as output. Each element in the output PCollection represents one line of text from the input file. This example happens to use input data stored in a publicly accessible Google Cloud Storage bucket ("gs://"). - ```java - p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) - ``` + ```java + p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) + ``` + + ```py + p | beam.io.Read(beam.io.TextFileSource('gs://dataflow-samples/shakespeare/kinglear.txt')) + ``` 2. A [ParDo]({{ site.baseurl }}/documentation/programming-guide/#transforms-pardo) transform that invokes a `DoFn` (defined in-line as an anonymous class) on each element that tokenizes the text lines into individual words. The input for this transform is the `PCollection` of text lines generated by the previous `TextIO.Read` transform. The `ParDo` transform outputs a new `PCollection`, where each element represents an individual word in the text. - ```java - .apply("ExtractWords", ParDo.of(new DoFn() { - @ProcessElement - public void processElement(ProcessContext c) { - for (String word : c.element().split("[^a-zA-Z']+")) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - })) - ``` + ```java + .apply("ExtractWords", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + for (String word : c.element().split("[^a-zA-Z']+")) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + })) + ``` + + ```py + # The Flatmap transform is a simplified version of ParDo. + | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) + ``` 3. The SDK-provided `Count` transform is a generic transform that takes a `PCollection` of any type, and returns a `PCollection` of key/value pairs. Each key represents a unique element from the input collection, and each value represents the number of times that key appeared in the input collection. In this pipeline, the input for `Count` is the `PCollection` of individual words generated by the previous `ParDo`, and the output is a `PCollection` of key/value pairs where each key represents a unique word in the text and the associated value is the occurrence count for each. - ```java - .apply(Count.perElement()) - ``` + ```java + .apply(Count.perElement()) + ``` + + ```py + | beam.combiners.Count.PerElement() + ``` 4. The next transform formats each of the key/value pairs of unique words and occurrence counts into a printable string suitable for writing to an output file. `MapElements` is a higher-level composite transform that encapsulates a simple `ParDo`; for each element in the input `PCollection`, `MapElements` applies a function that produces exactly one output element. In this example, `MapElements` invokes a `SimpleFunction` (defined in-line as an anonymous class) that does the formatting. As input, `MapElements` takes a `PCollection` of key/value pairs generated by `Count`, and produces a new `PCollection` of printable strings. - ```java - .apply("FormatResults", MapElements.via(new SimpleFunction, String>() { - @Override - public String apply(KV input) { - return input.getKey() + ": " + input.getValue(); - } - })) - ``` + ```java + .apply("FormatResults", MapElements.via(new SimpleFunction, String>() { + @Override + public String apply(KV input) { + return input.getKey() + ": " + input.getValue(); + } + })) + ``` + + ```py + | beam.Map(lambda (word, count): '%s: %s' % (word, count)) + ``` 5. A text file `Write`. This transform takes the final `PCollection` of formatted Strings as input and writes each element to an output text file. Each element in the input `PCollection` represents one line of text in the resulting output file. - ```java - .apply(TextIO.Write.to("wordcounts")); - ``` + ```java + .apply(TextIO.Write.to("wordcounts")); + ``` + + ```py + | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt')) + ``` Note that the `Write` transform produces a trivial result value of type `PDone`, which in this case is ignored. @@ -135,6 +178,10 @@ Run the pipeline by calling the `run` method, which sends your pipeline to be ex p.run().waitUntilFinish(); ``` +```py +p.run() +``` + Note that the `run` method is asynchronous. For a blocking execution instead, run your pipeline appending the `waitUntilFinish` method. ## WordCount Example @@ -157,9 +204,9 @@ The following sections explain these key concepts in detail, and break down the When using `ParDo` transforms, you need to specify the processing operation that gets applied to each element in the input `PCollection`. This processing operation is a subclass of the SDK class `DoFn`. You can create the `DoFn` subclasses for each `ParDo` inline, as an anonymous inner class instance, as is done in the previous example (Minimal WordCount). However, it's often a good idea to define the `DoFn` at the global level, which makes it easier to unit test and can make the `ParDo` code more readable. -In this example, `ExtractWordsFn` is a `DoFn` that is defined as a static class: - ```java +// In this example, ExtractWordsFn is a DoFn that is defined as a static class: + static class ExtractWordsFn extends DoFn { ... @@ -170,10 +217,16 @@ static class ExtractWordsFn extends DoFn { } ``` -This `DoFn` (`ExtractWordsFn`) is the processing operation that `ParDo` applies to the `PCollection` of words: +```py +# In this example, the DoFns are defined as classes: -```java -PCollection words = lines.apply(ParDo.of(new ExtractWordsFn())); +class FormatAsTextFn(beam.DoFn): + + def process(self, context): + word, count = context.element + yield '%s: %s' % (word, count) + +formatted = counts | beam.ParDo(FormatAsTextFn()) ``` ### Creating Composite Transforms @@ -211,13 +264,28 @@ public static void main(String[] args) throws IOException { } ``` +```py +class CountWords(beam.PTransform): + + def apply(self, pcoll): + return (pcoll + # Convert lines of text into individual words. + | beam.FlatMap( + 'ExtractWords', lambda x: re.findall(r'[A-Za-z\']+', x)) + + # Count the number of times each word occurs. + | beam.combiners.Count.PerElement()) + +counts = lines | CountWords() +``` + ### Using Parameterizable PipelineOptions You can hard-code various execution options when you run your pipeline. However, the more common way is to define your own configuration options via command-line argument parsing. Defining your configuration options via the command-line makes the code more easily portable across different runners. Add arguments to be processed by the command-line parser, and specify default values for them. You can then access the options values in your pipeline code. -``` java +```java public static interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") @@ -234,6 +302,19 @@ public static void main(String[] args) { } ``` +```py +class WordCountOptions(PipelineOptions): + + @classmethod + def _add_argparse_args(cls, parser): + parser.add_argument('--input', + help='Input for the dataflow pipeline', + default='gs://my-bucket/input') + +options = PipelineOptions(argv) +p = beam.Pipeline(options=options) +``` + ## Debugging WordCount Example The Debugging WordCount example demonstrates some best practices for instrumenting your pipeline code. @@ -259,9 +340,9 @@ If you execute your pipeline using `DirectRunner`, it will print the log message If you execute your pipeline using `DataflowRunner`, you can use Google Cloud Logging. Google Cloud Logging (currently in beta) aggregates the logs from all of your Dataflow job's workers to a single location in the Google Cloud Platform Console. You can use Cloud Logging to search and access the logs from all of the Compute Engine instances that Dataflow has spun up to complete your Dataflow job. You can add logging statements into your pipeline's `DoFn` instances that will appear in Cloud Logging as your pipeline runs. -In this example, we use `.trace` and `.debug`: - ```java +// This example uses .trace and .debug: + public class DebuggingWordCount { public static class FilterTextFn extends DoFn, KV> { @@ -280,6 +361,43 @@ public class DebuggingWordCount { } ``` +```py +import logging + +class FilterTextFn(beam.DoFn): + """A DoFn that filters for a specific key based on a regular expression.""" + + # A custom aggregator can track values in your pipeline as it runs. Create + # custom aggregators matched_word and unmatched_words. + matched_words = beam.Aggregator('matched_words') + umatched_words = beam.Aggregator('umatched_words') + + def __init__(self, pattern): + self.pattern = pattern + + def process(self, context): + word, _ = context.element + if re.match(self.pattern, word): + # Log at INFO level each element we match. When executing this pipeline + # using the Dataflow service, these log lines will appear in the Cloud + # Logging UI. + logging.info('Matched %s', word) + + # Add 1 to the custom aggregator matched_words + context.aggregate_to(self.matched_words, 1) + yield context.element + else: + # Log at the "DEBUG" level each element that is not matched. Different + # log levels can be used to control the verbosity of logging providing + # an effective mechanism to filter less important information. Note + # currently only "INFO" and higher level logs are emitted to the Cloud + # Logger. This log message will not be visible in the Cloud Logger. + logging.debug('Did not match %s', word) + + # Add 1 to the custom aggregator umatched_words + context.aggregate_to(self.umatched_words, 1) +``` + If you execute your pipeline using `DataflowRunner`, you can control the worker log levels. Dataflow workers that execute user code are configured to log to Cloud Logging by default at "INFO" log level and higher. You can override log levels for specific logging namespaces by specifying: `--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}`. For example, by specifying `--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}` when executing this pipeline using the Dataflow service, Cloud Logging would contain only "DEBUG" or higher level logs for the package in addition to the default "INFO" or higher level logs. The default Dataflow worker logging configuration can be overridden by specifying `--defaultWorkerLogLevel=`. For example, by specifying `--defaultWorkerLogLevel=DEBUG` when executing this pipeline with the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note that changing the default worker log level to TRACE or DEBUG will significantly increase the amount of logs output. @@ -309,6 +427,10 @@ public static void main(String[] args) { } ``` +```py +This feature is not yet available in the Beam SDK for Python. +``` + ## WindowedWordCount This example, `WindowedWordCount`, counts words in text just as the previous examples did, but introduces several advanced concepts. @@ -338,6 +460,10 @@ public static void main(String[] args) throws IOException { ``` +```py +This feature is not yet available in the Beam SDK for Python. +``` + ### Adding Timestamps to Data Each element in a `PCollection` has an associated **timestamp**. The timestamp for each element is initially assigned by the source that creates the `PCollection` and can be adjusted by a `DoFn`. In this example the input is bounded. For the purpose of the example, the `DoFn` method named `AddTimestampsFn` (invoked by `ParDo`) will set a timestamp for each element in the `PCollection`. @@ -346,6 +472,10 @@ Each element in a `PCollection` has an associated **timestamp**. The timestamp f .apply(ParDo.of(new AddTimestampFn())); ``` +```py +This feature is not yet available in the Beam SDK for Python. +``` + Below is the code for `AddTimestampFn`, a `DoFn` invoked by `ParDo`, that sets the data element of the timestamp given the element itself. For example, if the elements were log lines, this `ParDo` could parse the time out of the log string and set it as the element's timestamp. There are no timestamps inherent in the works of Shakespeare, so in this case we've made up random timestamps just to illustrate the concept. Each line of the input text will get a random associated timestamp sometime in a 2-hour period. ```java @@ -369,6 +499,10 @@ static class AddTimestampFn extends DoFn { } ``` +```py +This feature is not yet available in the Beam SDK for Python. +``` + ### Windowing Beam uses a concept called **Windowing** to subdivide a `PCollection` according to the timestamps of its individual elements. `PTransforms` that aggregate multiple elements, process each `PCollection` as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded). @@ -381,14 +515,22 @@ PCollection windowedWords = input FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); ``` +```py +This feature is not yet available in the Beam SDK for Python. +``` + ### Reusing PTransforms over windowed PCollections You can reuse existing `PTransform`s, that were created for manipulating simple `PCollection`s, over windowed `PCollection`s as well. -``` +```java PCollection> wordCounts = windowedWords.apply(new WordCount.CountWords()); ``` +```py +This feature is not yet available in the Beam SDK for Python. +``` + ### Write Results to an Unbounded Sink Since our input is unbounded, the same is true of our output `PCollection`. We need to make sure that we choose an appropriate, unbounded sink. Some output sinks support only bounded output, such as a text file. Google Cloud BigQuery is an output source that supports both bounded and unbounded input. @@ -404,3 +546,7 @@ wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); ``` +```py +This feature is not yet available in the Beam SDK for Python. +``` +