beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/5] beam-site git commit: Update according to reviewer comments
Date Fri, 27 Jan 2017 21:57:47 GMT
Update according to reviewer comments


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/bc4d4947
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/bc4d4947
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/bc4d4947

Branch: refs/heads/asf-site
Commit: bc4d4947461244eee83d7c0f31e76cda9971b5cb
Parents: 884971d
Author: Hadar Hod <hadarh@google.com>
Authored: Tue Jan 24 23:07:48 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Jan 27 13:42:01 2017 -0800

----------------------------------------------------------------------
 src/documentation/programming-guide.md | 135 ++++++++++------------------
 1 file changed, 48 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/bc4d4947/src/documentation/programming-guide.md
----------------------------------------------------------------------
diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index 3851bb5..c0e947f 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -69,7 +69,7 @@ A typical Beam driver program works as follows:
 
 When you run your Beam driver program, the Pipeline Runner that you designate constructs
a **workflow graph** of your pipeline based on the `PCollection` objects you've created and
transforms that you've applied. That graph is then executed using the appropriate distributed
processing back-end, becoming an asynchronous "job" (or equivalent) on that back-end.
 
-## <a name="pipeline"></a>Creating the Pipeline
+## <a name="pipeline"></a>Creating the pipeline
 
 The `Pipeline` abstraction encapsulates all the data and steps in your data processing task.
Your Beam driver program typically starts by constructing a <span class="language-java">[Pipeline]({{
site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/Pipeline.html)</span><span
class="language-py">[Pipeline](https://github.com/apache/beam/blob/python-sdk/sdks/python/apache_beam/pipeline.py)</span>
object, and then using that object as the basis for creating the pipeline's data sets as `PCollection`s
and its operations as `Transform`s.
 
@@ -110,7 +110,7 @@ After you've created your `Pipeline`, you'll need to begin by creating
at least
 
 You create a `PCollection` by either reading data from an external source using Beam's [Source
API](#io), or you can create a `PCollection` of data stored in an in-memory collection class
in your driver program. The former is typically how a production pipeline would ingest data;
Beam's Source APIs contain adapters to help you read from external sources like large cloud-based
files, databases, or subscription services. The latter is primarily useful for testing and
debugging purposes.
 
-#### Reading from an External Source
+#### Reading from an external source
 
 To read from an external source, you use one of the [Beam-provided I/O adapters](#io). The
adapters vary in their exact usage, but all of them from some external data source and return
a `PCollection` whose elements represent the data records in that source. 
 
@@ -141,7 +141,7 @@ lines = p | 'ReadMyFile' >> beam.io.Read(beam.io.TextFileSource("protocol://path
 
 See the [section on I/O](#io) to learn more about how to read from the various data sources
supported by the Beam SDK.
 
-#### Creating a PCollection from In-Memory Data
+#### Creating a PCollection from in-memory data
 
 {:.language-java}
 To create a `PCollection` from an in-memory Java `Collection`, you use the Beam-provided
`Create` transform. Much like a data adapter's `Read`, you apply `Create` directly to your
`Pipeline` object itself.
@@ -190,11 +190,11 @@ p = beam.Pipeline()
 collection = p | 'ReadMyLines' >> beam.Create(lines)
 ```
 
-### <a name="pccharacteristics"></a>PCollection Characteristics
+### <a name="pccharacteristics"></a>PCollection characteristics
 
 A `PCollection` is owned by the specific `Pipeline` object for which it is created; multiple
pipelines cannot share a `PCollection`. In some respects, a `PCollection` functions like a
collection class. However, a `PCollection` can differ in a few key ways:
 
-#### <a name="pcelementtype"></a>Element Type
+#### <a name="pcelementtype"></a>Element type
 
 The elements of a `PCollection` may be of any type, but must all be of the same type. However,
to support distributed processing, Beam needs to be able to encode each individual element
as a byte string (so elements can be passed around to distributed workers). The Beam SDKs
provide a data encoding mechanism that includes built-in encoding for commonly-used types
as well as support for specifying custom encodings as needed.
 
@@ -202,11 +202,11 @@ The elements of a `PCollection` may be of any type, but must all be
of the same
 
 A `PCollection` is immutable. Once created, you cannot add, remove, or change individual
elements. A Beam Transform might process each element of a `PCollection` and generate new
pipeline data (as a new `PCollection`), *but it does not consume or modify the original input
collection*.
 
-#### <a name="pcrandomaccess"></a>Random Access
+#### <a name="pcrandomaccess"></a>Random access
 
 A `PCollection` does not support random access to individual elements. Instead, Beam Transforms
consider every element in a `PCollection` individually.
 
-#### <a name="pcsizebound"></a>Size and Boundedness
+#### <a name="pcsizebound"></a>Size and boundedness
 
 A `PCollection` is a large, immutable "bag" of elements. There is no upper limit on how many
elements a `PCollection` can contain; any given `PCollection` might fit in memory on a single
machine, or it might represent a very large distributed data set backed by a persistent data
store.
 
@@ -216,7 +216,7 @@ The bounded (or unbounded) nature of your `PCollection` affects how Beam
process
 
 When performing an operation that groups elements in an unbounded `PCollection`, Beam requires
a concept called **Windowing** to divide a continuously updating data set into logical windows
of finite size.  Beam processes each window as a bundle, and processing continues as the data
set is generated. These logical windows are determined by some characteristic associated with
a data element, such as a **timestamp**.
 
-#### <a name="pctimestamps"></a>Element Timestamps
+#### <a name="pctimestamps"></a>Element timestamps
 
 Each element in a `PCollection` has an associated intrinsic **timestamp**. The timestamp
for each element is initially assigned by the [Source](#io) that creates the `PCollection`.
Sources that create an unbounded `PCollection` often assign each new element a timestamp that
corresponds to when the element was read or added.
 
@@ -226,7 +226,7 @@ Timestamps are useful for a `PCollection` that contains elements with
an inheren
 
 You can manually assign timestamps to the elements of a `PCollection` if the source doesn't
do it for you. You'll want to do this if the elements have an inherent timestamp, but the
timestamp is somewhere in the structure of the element itself (such as a "time" field in a
server log entry). Beam has [Transforms](#transforms) that take a `PCollection` as input and
output an identical `PCollection` with timestamps attached; see [Assigning Timestamps](#windowing)
for more information on how to do so.
 
-## <a name="transforms"></a>Applying Transforms
+## <a name="transforms"></a>Applying transforms
 
 In the Beam SDKs, **transforms** are the operations in your pipeline. A transform takes a
`PCollection` (or more than one `PCollection`) as input, performs an operation that you specify
on each element in that collection, and produces a new output `PCollection`. To invoke a transform,
you must **apply** it to the input `PCollection`.
 
@@ -277,7 +277,7 @@ You can also build your own [composite transforms](#transforms-composite)
that n
 
 The transforms in the Beam SDKs provide a generic **processing framework**, where you provide
processing logic in the form of a function object (colloquially referred to as "user code").
The user code gets applied to the elements of the input `PCollection`. Instances of your user
code might then be executed in parallel by many different workers across a cluster, depending
on the pipeline runner and back-end that you choose to execute your Beam pipeline. The user
code running on each worker generates the output elements that are ultimately added to the
final output `PCollection` that the transform produces.
 
-### Core Beam Transforms
+### Core Beam transforms
 
 Beam provides the following transforms, each of which represents a different processing paradigm:
 
@@ -390,7 +390,7 @@ In your processing method, you'll also need to meet some immutability
requiremen
 * Once you output a value using `ProcessContext.output()` or `ProcessContext.sideOutput()`,
you should not modify that value in any way.
 
 
-##### Lightweight DoFns and Other Abstractions
+##### Lightweight DoFns and other abstractions
 
 If your function is relatively straightforward, you can simplify your use of `ParDo` by providing
a lightweight `DoFn` in-line, as <span class="language-java">an anonymous inner class
instance</span><span class="language-py">a lambda function</span>.
 
@@ -403,9 +403,8 @@ PCollection<String> words = ...;
 // Apply a ParDo with an anonymous DoFn to the PCollection words.
 // Save the result as the PCollection wordLengths.
 PCollection<Integer> wordLengths = words.apply(
-  ParDo
-    .named("ComputeWordLengths")            // the transform name
-    .of(new DoFn<String, Integer>() {       // a DoFn as an anonymous inner class instance
+  "ComputeWordLengths",                     // the transform name
+  ParDo.of(new DoFn<String, Integer>() {    // a DoFn as an anonymous inner class instance
       @ProcessElement
       public void processElement(ProcessContext c) {
         c.output(c.element().length());
@@ -497,7 +496,7 @@ When you apply a `Combine` transform, you must provide the function that
contain
 
 Simple combine operations, such as sums, can usually be implemented as a simple function.
More complex combination operations might require you to create a subclass of `CombineFn`
that has an accumulation type distinct from the input/output type.
 
-##### **Simple Combinations Using Simple Functions**
+##### **Simple combinations using simple functions**
 
 The following example code shows a simple combine function.
 
@@ -519,7 +518,7 @@ public static class SumInts implements SerializableFunction<Iterable<Integer>,
I
 {% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py
tag:combine_bounded_sum
 %}```
 
-##### **Advanced Combinations using CombineFn**
+##### **Advanced combinations using CombineFn**
 
 For more complex combine functions, you can define a subclass of `CombineFn`. You should
use `CombineFn` if the combine function requires a more sophisticated accumulator, must perform
additional pre- or post-processing, might change the output type, or takes the key into account.
 
@@ -576,7 +575,7 @@ pc = ...
 
 If you are combining a `PCollection` of key-value pairs, [per-key combining](#transforms-combine-per-key)
is often enough. If you need the combining strategy to change based on the key (for example,
MIN for some users and MAX for other users), you can define a `KeyedCombineFn` to access the
key within the combining strategy.
 
-##### **Combining a PCollection into a Single Value**
+##### **Combining a PCollection into a single value**
 
 Use the global combine to transform all of the elements in a given `PCollection` into a single
value, represented in your pipeline as a new `PCollection` containing one element. The following
example code shows how to apply the Beam provided sum combine function to produce a single
sum value for a `PCollection` of integers.
 
@@ -595,7 +594,7 @@ pc = ...
 result = pc | beam.CombineGlobally(sum)
 ```
 
-##### Global Windowing:
+##### Global windowing:
 
 If your input `PCollection` uses the default global windowing, the default behavior is to
return a `PCollection` containing one item. That item's value comes from the accumulator in
the combine function that you specified when applying `Combine`. For example, the Beam provided
sum combine function returns a zero value (the sum of an empty input), while the min combine
function returns a maximal or infinite value.
 
@@ -613,7 +612,7 @@ sum = pc | beam.CombineGlobally(sum).without_defaults()
 
 ```
 
-##### Non-Global Windowing:
+##### Non-global windowing:
 
 If your `PCollection` uses any non-global windowing function, Beam does not provide the default
behavior. You must specify one of the following options when applying `Combine`:
 
@@ -621,7 +620,7 @@ If your `PCollection` uses any non-global windowing function, Beam does
not prov
 * Specify `.asSingletonView`, in which the output is immediately converted to a `PCollectionView`,
which will provide a default value for each empty window when used as a side input. You'll
generally only need to use this option if the result of your pipeline's `Combine` is to be
used as a side input later in the pipeline.
 
 
-##### <a name="transforms-combine-per-key"></a>**Combining Values in a Key-Grouped
Collection**
+##### <a name="transforms-combine-per-key"></a>**Combining values in a key-grouped
collection**
 
 After creating a key-grouped collection (for example, by using a `GroupByKey` transform)
a common pattern is to combine the collection of values associated with each key into a single,
merged value. Drawing on the previous example from `GroupByKey`, a key-grouped `PCollection`
called `groupedWords` looks like this:
 
@@ -688,11 +687,11 @@ merged = (
     | beam.Flatten())
 ```
 
-##### Data Encoding in Merged Collections:
+##### Data encoding in merged collections:
 
 By default, the coder for the output `PCollection` is the same as the coder for the first
`PCollection` in the input `PCollectionList`. However, the input `PCollection` objects can
each use different coders, as long as they all contain the same data type in your chosen language.
 
-##### Merging Windowed Collections:
+##### Merging windowed collections:
 
 When using `Flatten` to merge `PCollection` objects that have a windowing strategy applied,
all of the `PCollection` objects you want to merge must use a compatible windowing strategy
and window sizing. For example, all the collections you're merging must all use (hypothetically)
identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.
 
@@ -733,7 +732,7 @@ by_decile = students | beam.Partition(partition_fn, 10)
 fortieth_percentile = by_decile[4]
 ```
 
-#### <a name="transforms-usercodereqs"></a>General Requirements for Writing User
Code for Beam Transforms
+#### <a name="transforms-usercodereqs"></a>General Requirements for writing user
code for Beam transforms
 
 When you build user code for a Beam transform, you should keep in mind the distributed nature
of execution. For example, there might be many copies of your function running on a lot of
different machines in parallel, and those copies function independently, without communicating
or sharing state with any of the other copies. Depending on the Pipeline Runner and processing
back-end you choose for your pipeline, each copy of your user code function may be retried
or run multiple times. As such, you should be cautious about including things like state dependency
in your user code.
 
@@ -758,7 +757,7 @@ Some other serializability factors you should keep in mind are:
 * Mutating a function object after it gets applied will have no effect.
 * Take care when declaring your function object inline by using an anonymous inner class
instance. In a non-static context, your inner class instance will implicitly contain a pointer
to the enclosing class and that class' state. That enclosing class will also be serialized,
and thus the same considerations that apply to the function object itself also apply to this
outer class.
 
-##### Thread-Compatibility
+##### Thread-compatibility
 
 Your function object should be thread-compatible. Each instance of your function object is
accessed by a single thread on a worker instance, unless you explicitly create your own threads.
Note, however, that **the Beam SDKs are not thread-safe**. If you create your own threads
in your user code, you must provide your own synchronization. Note that static members in
your function object are not passed to worker instances and that multiple instances of your
function may be accessed from different threads.
 
@@ -768,14 +767,14 @@ It's recommended that you make your function object idempotent--that
is, that it
 
 #### <a name="transforms-sideio"></a>Side Inputs and Side Outputs
 
-##### **Side Inputs**
+##### **Side inputs**
 
 In addition to the main input `PCollection`, you can provide additional inputs to a `ParDo`
transform in the form of side inputs. A side input is an additional input that your `DoFn`
can access each time it processes an element in the input `PCollection`. When you specify
a side input, you create a view of some other data that can be read from within the `ParDo`
transform's `DoFn` while procesing each element.
 
 Side inputs are useful if your `ParDo` needs to inject additional data when processing each
element in the input `PCollection`, but the additional data needs to be determined at runtime
(and not hard-coded). Such values might be determined by the input data, or depend on a different
branch of your pipeline.
 
 
-##### Passing Side Inputs to ParDo:
+##### Passing side inputs to ParDo:
 
 ```java
   // Pass side inputs to your ParDo transform by invoking .withSideInputs.
@@ -824,7 +823,7 @@ Side inputs are useful if your `ParDo` needs to inject additional data
when proc
 
 ```
 
-##### Side Inputs and Windowing:
+##### Side inputs and windowing:
 
 A windowed `PCollection` may be infinite and thus cannot be compressed into a single value
(or single collection class). When you create a `PCollectionView` of a windowed `PCollection`,
the `PCollectionView` represents a single entity per window (one singleton per window, one
list per window, etc.).
 
@@ -836,11 +835,11 @@ If the main input element exists in more than one window, then `processElement`
 
 If the side input has multiple trigger firings, Beam uses the value from the latest trigger
firing. This is particularly useful if you use a side input with a single global window and
specify a trigger.
 
-##### **Side Outputs**
+##### **Side outputs**
 
 While `ParDo` always produces a main output `PCollection` (as the return value from apply),
you can also have your `ParDo` produce any number of additional output `PCollection`s. If
you choose to have multiple outputs, your `ParDo` returns all of the output `PCollection`s
(including the main output) bundled together.
 
-##### Tags for Side Outputs:
+##### Tags for side outputs:
 
 ```java
 // To emit elements to a side output PCollection, create a TupleTag object to identify each
collection that your ParDo produces.
@@ -902,7 +901,7 @@ While `ParDo` always produces a main output `PCollection` (as the return
value f
 {% github_sample /apache/beam/blob/python-sdk/sdks/python/apache_beam/examples/snippets/snippets_test.py
tag:model_pardo_with_side_outputs_iter
 %}```
 
-##### Emitting to Side Outputs in your DoFn:
+##### Emitting to side outputs in your DoFn:
 
 ```java
 // Inside your ParDo's DoFn, you can emit an element to a side output by using the method
ProcessContext.sideOutput.
@@ -948,77 +947,39 @@ When you create a pipeline, you often need to read data from some external
sourc
 
 > A guide that covers how to implement your own Beam IO transforms is in progress ([BEAM-1025](https://issues.apache.org/jira/browse/BEAM-1025)).
 
-### Reading Input Data
+### Reading input data
 
-Read transforms read data from an external source and return a `PCollection` representation
of the data for use by your pipeline. You can use a read transform at any point while constructing
your pipeline to create a new `PCollection`, though it will be most common at the start of
your pipeline. Here are examples of two common ways to read data.
-
-#### Reading from a `Source`:
-
-```java
-// A fully-specified Read from a GCS file:
-PCollection<Integer> numbers =
-  p.apply("ReadNumbers", TextIO.Read
-   .from("gs://my_bucket/path/to/numbers-*.txt")
-   .withCoder(TextualIntegerCoder.of()));
-```
-
-```python
-pipeline | beam.io.ReadFromText('protocol://path/to/some/inputData.txt')
-```
-
-Note that many sources use the builder pattern for setting options. For additional examples,
see the language-specific documentation (such as Javadoc) for each of the sources.
+Read transforms read data from an external source and return a `PCollection` representation
of the data for use by your pipeline. You can use a read transform at any point while constructing
your pipeline to create a new `PCollection`, though it will be most common at the start of
your pipeline.
 
 #### Using a read transform:
 
 ```java
-// This example uses JmsIO.
-PCollection<JmsRecord> output =
-    pipeline.apply(JmsIO.read()
-        .withConnectionFactory(myConnectionFactory)
-        .withQueue("my-queue"))
+PCollection<String> lines = p.apply(TextIO.Read.from("gs://some/inputData.txt")); 
 
 ```
 
 ```python
-pipeline | beam.io.textio.ReadFromText('my_file_name')
+lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')
 ```
 
-### Writing Output Data
+### Writing output data
 
-Write transforms write the data in a `PCollection` to an external data source. You will most
often use write transforms at the end of your pipeline to output your pipeline's final results.
However, you can use a write transform to output a `PCollection`'s data at any point in your
pipeline. Here are examples of two common ways to write data.
-
-#### Writing to a `Sink`:
-
-```java
-// This example uses XmlSink.
-pipeline.apply(Write.to(
-          XmlSink.ofRecordClass(Type.class)
-              .withRootElementName(root_element)
-              .toFilenamePrefix(output_filename)));
-```
-
-```python
-output | beam.io.WriteToText('my_file_name')
-```
+Write transforms write the data in a `PCollection` to an external data source. You will most
often use write transforms at the end of your pipeline to output your pipeline's final results.
However, you can use a write transform to output a `PCollection`'s data at any point in your
pipeline. 
 
-#### Using a write transform:
+#### Using a Write transform:
 
 ```java
-// This example uses JmsIO.
-pipeline.apply(...) // returns PCollection<String>
-        .apply(JmsIO.write()
-            .withConnectionFactory(myConnectionFactory)
-            .withQueue("my-queue")
+output.apply(TextIO.Write.to("gs://some/outputData"));
 ```
 
 ```python
-output | beam.io.textio.WriteToText('my_file_name')
+output | beam.io.WriteToText('gs://some/outputData')
 ```
 
 ### File-based input and output data
 
-#### Reading From Multiple Locations:
+#### Reading from multiple locations:
 
-Many read transforms support reading from multiple input files matching a glob operator you
provide. The following TextIO example uses a glob operator (\*) to read all matching input
files that have prefix "input-" and the suffix ".csv" in the given location:
+Many read transforms support reading from multiple input files matching a glob operator you
provide. Note that glob operators are filesystem-specific and obey filesystem-specific consistency
models. The following TextIO example uses a glob operator (\*) to read all matching input
files that have prefix "input-" and the suffix ".csv" in the given location:
 
 ```java
 p.apply(“ReadFromText”,
@@ -1031,18 +992,18 @@ lines = p | beam.io.Read(
     beam.io.TextFileSource('protocol://my_bucket/path/to/input-*.csv'))
 ```
 
-To read data from disparate sources into a single `PCollection`, read each one independently
and then use the `Flatten` transform to create a single `PCollection`.
+To read data from disparate sources into a single `PCollection`, read each one independently
and then use the [Flatten](#transforms-flatten-partition) transform to create a single `PCollection`.
 
-#### Writing To Multiple Output Files:
+#### Writing to multiple output files:
 
 For file-based output data, write transforms write to multiple output files by default. When
you pass an output file name to a write transform, the file name is used as the prefix for
all output files that the write transform produces. You can append a suffix to each output
file by specifying a suffix.
 
 The following write transform example writes multiple output files to a location. Each file
has the prefix "numbers", a numeric tag, and the suffix ".csv".
 
 ```java
-records.apply(TextIO.Write.named("WriteToText")
-                          .to("protocol://my_bucket/path/to/numbers")
-                          .withSuffix(".csv"));
+records.apply("WriteToText",
+    TextIO.Write.to("protocol://my_bucket/path/to/numbers")
+                .withSuffix(".csv"));
 ```
 
 ```python
@@ -1050,7 +1011,7 @@ filtered_words | beam.io.WriteToText(
 'protocol://my_bucket/path/to/numbers', file_name_suffix='.csv')
 ```
 
-### Beam provided I/O APIs
+### Beam-provided I/O APIs
 
 See the language specific source code directories for the Beam supported I/O APIs. Specific
documentation for each of these I/O sources will be added in the future. ([BEAM-1054](https://issues.apache.org/jira/browse/BEAM-1054))
 
@@ -1100,7 +1061,7 @@ See the language specific source code directories for the Beam supported
I/O API
 </table>
 
 
-## <a name="running"></a>Running the Pipeline
+## <a name="running"></a>Running the pipeline
 
 To run your pipeline, use the `run` method. The program you create sends a specification
for your pipeline to a pipeline runner, which then constructs and runs the actual series of
pipeline operations. Pipelines are executed asynchronously by default.
 
@@ -1119,7 +1080,7 @@ pipeline.run().waitUntilFinish();
 ```
 
 ```python
-# Not currently supported.
+pipeline.run().wait_until_finish();
 ```
 
 <a name="transforms-composite"></a>


Mime
View raw message