Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BAD10200C15 for ; Wed, 8 Feb 2017 19:55:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B8176160B49; Wed, 8 Feb 2017 18:55:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1F970160B5A for ; Wed, 8 Feb 2017 19:55:05 +0100 (CET) Received: (qmail 47520 invoked by uid 500); 8 Feb 2017 18:55:05 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 47497 invoked by uid 99); 8 Feb 2017 18:55:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Feb 2017 18:55:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3317BDFDCF; Wed, 8 Feb 2017 18:55:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: frances@apache.org To: commits@beam.apache.org Date: Wed, 08 Feb 2017 18:55:06 -0000 Message-Id: <2eec375bbceb4feb9e9d75f094021203@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] beam-site git commit: Regenerate website archived-at: Wed, 08 Feb 2017 18:55:07 -0000 Regenerate website Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/6ebcb08c Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/6ebcb08c Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/6ebcb08c Branch: refs/heads/asf-site Commit: 6ebcb08cb503a3e58101fc73be11649116111c65 Parents: f277339 Author: Frances Perry Authored: Wed Feb 8 10:53:36 2017 -0800 Committer: Frances Perry Committed: Wed Feb 8 10:53:36 2017 -0800 ---------------------------------------------------------------------- .../documentation/programming-guide/index.html | 327 ++++++++++++++++--- 1 file changed, 274 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/6ebcb08c/content/documentation/programming-guide/index.html ---------------------------------------------------------------------- diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html index dee4869..9830735 100644 --- a/content/documentation/programming-guide/index.html +++ b/content/documentation/programming-guide/index.html @@ -208,7 +208,7 @@

PCollection: A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline.

  • -

    Transform: A Transform represents a data processing operation, or a step, in your pipeline. Every Transform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces one or more output PCollection objects.

    +

    Transform: A Transform represents a data processing operation, or a step, in your pipeline. Every Transform takes one or more PCollection objects as input, perfroms a processing function that you provide on the elements of that PCollection, and produces one or more output PCollection objects.

  • I/O Source and Sink: Beam provides Source and Sink APIs to represent reading and writing data, respectively. Source encapsulates the code necessary to read data into your Beam pipeline from some external source, such as cloud file storage or a subscription to a streaming data source. Sink likewise encapsulates the code necessary to write the elements of a PCollection to an external data sink.

    @@ -248,11 +248,13 @@ -
    from apache_beam.utils.pipeline_options import PipelineOptions
    -
    -# Will parse the arguments passed into the application and construct a PipelineOptions
    +
    # Will parse the arguments passed into the application and construct a PipelineOptions object.
     # Note that --help will print registered options.
    +
    +from apache_beam.utils.pipeline_options import PipelineOptions
    +
     p = beam.Pipeline(options=PipelineOptions())
    +
     
    @@ -286,13 +288,8 @@
    -
    import apache_beam as beam
    -
    -# Create the pipeline.
    -p = beam.Pipeline()
    +
    lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')
     
    -# Read the text file into a PCollection.
    -lines = p | 'ReadMyFile' >> beam.io.Read(beam.io.TextFileSource("protocol://path/to/some/inputData.txt"))
     
    @@ -327,20 +324,18 @@
    -
    import apache_beam as beam
    +
    p = beam.Pipeline(options=pipeline_options)
     
    -# python list
    -lines = [
    -  "To be, or not to be: that is the question: ",
    -  "Whether 'tis nobler in the mind to suffer ",
    -  "The slings and arrows of outrageous fortune, ",
    -  "Or to take arms against a sea of troubles, "
    -]
    +(p
    + | beam.Create([
    +     'To be, or not to be: that is the question: ',
    +     'Whether \'tis nobler in the mind to suffer ',
    +     'The slings and arrows of outrageous fortune, ',
    +     'Or to take arms against a sea of troubles, '])
    + | beam.io.WriteToText(my_options.output))
     
    -# Create the pipeline.
    -p = beam.Pipeline()
    +result = p.run()
     
    -collection = p | 'ReadMyLines' >> beam.Create(lines)
     
    @@ -401,8 +396,8 @@

    How you apply your pipeline’s transforms determines the structure of your pipeline. The best way to think of your pipeline is as a directed acyclic graph, where the nodes are PCollections and the edges are transforms. For example, you can chain transforms to create a sequential pipeline, like this one:

    [Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
    -							.apply([Second Transform])
    -							.apply([Third Transform])
    +.apply([Second Transform])
    +.apply([Third Transform])
     
    @@ -418,8 +413,13 @@

    However, note that a transform does not consume or otherwise alter the input collection–remember that a PCollection is immutable by definition. This means that you can apply multiple transforms to the same input PCollection to create a branching pipeline, like so:

    -
    [Output PCollection 1] = [Input PCollection].apply([Transform 1])
    -[Output PCollection 2] = [Input PCollection].apply([Transform 2])
    +
    [Output PCollection 1] = [Input PCollection].apply([Transform 1])
    +[Output PCollection 2] = [Input PCollection].apply([Transform 2])
    +
    +
    + +
    [Output PCollection 1] = [Input PCollection] | [Transform 1]
    +[Output PCollection 2] = [Input PCollection] | [Transform 2]
     
    @@ -427,7 +427,7 @@

    [Branching Graph Graphic]

    -

    You can also build your own composite transforms that nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.

    +

    You can also build your own composite transforms that nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.

    Transforms in the Beam SDK

    @@ -488,12 +488,9 @@ # The DoFn to perform on each element in the input PCollection. class ComputeWordLengthFn(beam.DoFn): - def process(self, context): - # Get the input element from ProcessContext. - word = context.element - # Use return to emit the output element. - return [len(word)] - + def process(self, element): + return [len(element)] + # Apply a ParDo to the PCollection "words" to compute lengths for each word. word_lengths = words | beam.ParDo(ComputeWordLengthFn())
    @@ -532,11 +529,9 @@
    class ComputeWordLengthFn(beam.DoFn):
    -  def process(self, context):
    -    # Get the input element from ProcessContext.
    -    word = context.element
    -    # Use return to emit the output element.
    -    return [len(word)]
    +  def process(self, element):
    +    return [len(element)]
    +
     
    @@ -780,7 +775,20 @@ tree, [2]
    # sum combines the elements in the input PCollection.
     # The resulting PCollection, called result, contains one value: the sum of all the elements in the input PCollection.
     pc = ...
    -result = pc | beam.CombineGlobally(sum)
    +class AverageFn(beam.CombineFn):
    +  def create_accumulator(self):
    +    return (0.0, 0)
    +
    +  def add_input(self, (sum, count), input):
    +    return sum + input, count + 1
    +
    +  def merge_accumulators(self, accumulators):
    +    sums, counts = zip(*accumulators)
    +    return sum(sums), sum(counts)
    +
    +  def extract_output(self, (sum, count)):
    +    return sum / count if count else float('NaN')
    +average = pc | beam.CombineGlobally(AverageFn())
     
    @@ -798,7 +806,6 @@ tree, [2]
    pc = ...
     sum = pc | beam.CombineGlobally(sum).without_defaults()
    -
     
    @@ -847,6 +854,7 @@ tree, [2] avg_accuracy_per_player = (player_accuracies | beam.CombinePerKey( beam.combiners.MeanCombineFn())) +
    @@ -870,11 +878,14 @@ tree, [2]
    # Flatten takes a tuple of PCollection objects.
    -# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.
    +# Returns a single PCollection that contains all of the elements in the 
     merged = (
    +    # [START model_multiple_pcollections_tuple]
         (pcoll1, pcoll2, pcoll3)
    +    # [END model_multiple_pcollections_tuple]
         # A list of tuples can be "piped" directly into a Flatten transform.
         | beam.Flatten())
    +
     
    @@ -918,8 +929,10 @@ tree, [2] by_decile = students | beam.Partition(partition_fn, 10) + # You can extract each partition from the tuple of PCollection objects as follows: fortieth_percentile = by_decile[4] + @@ -1213,7 +1226,7 @@ tree, [2] -
    lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')
    +
    lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')
     
    @@ -1227,7 +1240,7 @@ tree, [2]
    -
    output | beam.io.WriteToText('gs://some/outputData')
    +
    output | beam.io.WriteToText('gs://some/outputData')
     
    @@ -1242,9 +1255,8 @@ tree, [2]
    -
    lines = p | beam.io.Read(
    -    'ReadFromText',
    -    beam.io.TextFileSource('protocol://my_bucket/path/to/input-*.csv'))
    +
    lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')
    +
     
    @@ -1262,8 +1274,9 @@ tree, [2]
    -
    filtered_words | beam.io.WriteToText(
    -'protocol://my_bucket/path/to/numbers', file_name_suffix='.csv')
    +
    filtered_words | 'WriteToText' >> beam.io.WriteToText(
    +    '/path/to/numbers', file_name_suffix='.csv')
    +
     
    @@ -1324,23 +1337,231 @@ tree, [2]
    -
    pipeline.run()
    +
    pipeline.run()
     
    -

    For blocking execution, append the waitUntilFinish method:

    +

    For blocking execution, append the waitUntilFinish wait_until_finish method:

    pipeline.run().waitUntilFinish();
     
    -
    pipeline.run().wait_until_finish()
    +
    pipeline.run().wait_until_finish()
    +
    +
    + +

    Data encoding and type safety

    + +

    When you create or output pipeline data, you’ll need to specify how the elements in your PCollections are encoded and decoded to and from byte strings. Byte strings are used for intermediate storage as well reading from sources and writing to sinks. The Beam SDKs use objects called coders to describe how the elements of a given PCollection should be encoded and decoded.

    + +

    Using coders

    + +

    You typically need to specify a coder when reading data into your pipeline from an external source (or creating pipeline data from local data), and also when you output pipeline data to an external sink.

    + +

    In the Beam SDK for Java, the type Coder provides the methods required for encoding and decoding data. The SDK for Java provides a number of Coder subclasses that work with a variety of standard Java types, such as Integer, Long, Double, StringUtf8 and more. You can find all of the available Coder subclasses in the Coder package.

    + +

    In the Beam SDK for Python, the type Coder provides the methods required for encoding and decoding data. The SDK for Python provides a number of Coder subclasses that work with a variety of standard Python types, such as primitive types, Tuple, Iterable, StringUtf8 and more. You can find all of the available Coder subclasses in the apache_beam.coders package.

    + +

    When you read data into a pipeline, the coder indicates how to interpret the input data into a language-specific type, such as integer or string. Likewise, the coder indicates how the language-specific types in your pipeline should be written into byte strings for an output data sink, or to materialize intermediate data in your pipeline.

    + +

    The Beam SDKs set a coder for every PCollection in a pipeline, including those generated as output from a transform. Most of the time, the Beam SDKs can automatically infer the correct coder for an output PCollection.

    + +
    +

    Note that coders do not necessarily have a 1:1 relationship with types. For example, the Integer type can have multiple valid coders, and input and output data can use different Integer coders. A transform might have Integer-typed input data that uses BigEndianIntegerCoder, and Integer-typed output data that uses VarIntCoder.

    +
    + +

    You can explicitly set a Coder when inputting or outputting a PCollection. You set the Coder by calling the method .withCoder setting the coder argument when you apply your pipeline’s read or write transform.

    + +

    Typically, you set the Coder when the coder for a PCollection cannot be automatically inferred, or when you want to use a different coder than your pipeline’s default. The following example code reads a set of numbers from a text file, and sets a Coder of type TextualIntegerCoder VarIntCoder for the resulting PCollection:

    + +
    PCollection<Integer> numbers =
    +  p.begin()
    +  .apply(TextIO.Read.named("ReadNumbers")
    +    .from("gs://my_bucket/path/to/numbers-*.txt")
    +    .withCoder(TextualIntegerCoder.of()));```
    +
    +
    + +
    p = beam.Pipeline()
    +numbers = ReadFromText("gs://my_bucket/path/to/numbers-*.txt", coder=VarIntCoder())
    +
    +
    + +

    You can set the coder for an existing PCollection by using the method PCollection.setCoder. Note that you cannot call setCoder on a PCollection that has been finalized (e.g. by calling .apply on it).

    + +

    You can get the coder for an existing PCollection by using the method getCoder. This method will fail with anIllegalStateException if a coder has not been set and cannot be inferred for the given PCollection.

    + +

    Coder inference and default coders

    + +

    The Beam SDKs require a coder for every PCollection in your pipeline. Most of the time, however, you do not need to explicitly specify a coder, such as for an intermediate PCollection produced by a transform in the middle of your pipeline. In such cases, the Beam SDKs can infer an appropriate coder from the inputs and outputs of the transform used to produce the PCollection.

    + +

    Each pipeline object has a CoderRegistry. The CoderRegistry represents a mapping of Java types to the default coders that the pipeline should use for PCollections of each type.

    + +

    The Beam SDK for Python has a CoderRegistry that represents a mapping of Python types to the default coder that should be used for PCollections of each type.

    + +

    By default, the Beam SDK for Java automatically infers the Coder for the elements of an output PCollection using the type parameter from the transform’s function object, such as DoFn. In the case of ParDo, for example, a DoFn<Integer, String>function object accepts an input element of type Integer and produces an output element of type String. In such a case, the SDK for Java will automatically infer the default Coder for the output PCollection<String> (in the default pipeline CoderRegistry, this is StringUtf8Coder).

    + +

    By default, the Beam SDK for Python automatically infers the Coder for the elements of an output PCollection using the typehints from the transform’s function object, such as DoFn. In the case of ParDo, for example a DoFn with the typehints @beam.typehints.with_input_types(int) and @beam.typehints.with_output_types(str) accepts an input element of type int and produces an output element of type str. In such a case, the Beam SDK for Python will automatically infer the default Coder for the output PCollection (in the default pipeline CoderRegistry, this is BytesCoder).

    + +
    +

    NOTE: If you create your PCollection from in-memory data by using the Create transform, you cannot rely on coder inference and default coders. Create does not have access to any typing information for its arguments, and may not be able to infer a coder if the argument list contains a value whose exact run-time class doesn’t have a default coder registered.

    +
    + +

    When using Create, the simplest way to ensure that you have the correct coder is by invoking withCoder when you apply the Create transform.

    + +

    Default coders and the CoderRegistry

    + +

    Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.

    + +

    CoderRegistry contains a default mapping of coders to standard Java Python types for any pipeline you create using the Beam SDK for Java Python. The following table shows the standard mapping:

    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    Java TypeDefault Coder
    DoubleDoubleCoder
    InstantInstantCoder
    IntegerVarIntCoder
    IterableIterableCoder
    KVKvCoder
    ListListCoder
    MapMapCoder
    LongVarLongCoder
    StringStringUtf8Coder
    TableRowTableRowJsonCoder
    VoidVoidCoder
    byte[ ]ByteArrayCoder
    TimestampedValueTimestampedValueCoder
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    Python TypeDefault Coder
    intVarIntCoder
    floatFloatCoder
    strBytesCoder
    bytesStrUtf8Coder
    TupleTupleCoder
    + +
    Looking up a default coder
    + +

    You can use the method CoderRegistry.getDefaultCoder to determine the default Coder for a Java type. You can access the CoderRegistry for a given pipeline by using the method Pipeline.getCoderRegistry. This allows you to determine (or set) the default Coder for a Java type on a per-pipeline basis: i.e. “for this pipeline, verify that Integer values are encoded using BigEndianIntegerCoder.”

    + +

    You can use the method CoderRegistry.get_coder to determine the default Coder for a Python type. You can use coders.registry to access the CoderRegistry. This allows you to determine (or set) the default Coder for a Python type.

    + +
    Setting the default coder for a type
    + +

    To set the default Coder for a Java Python type for a particular pipeline, you obtain and modify the pipeline’s CoderRegistry. You use the method Pipeline.getCoderRegistry coders.registry to get the CoderRegistry object, and then use the method CoderRegistry.registerCoder CoderRegistry.register_coder to register a new Coder for the target type.

    + +

    The following example code demonstrates how to set a default Coder, in this case BigEndianIntegerCoder, for Integer int values for a pipeline.

    + +
    PipelineOptions options = PipelineOptionsFactory.create();
    +Pipeline p = Pipeline.create(options);
    +
    +CoderRegistry cr = p.getCoderRegistry();
    +cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
    +
    +
    + +
    apache_beam.coders.registry.register_coder(int, BigEndianIntegerCoder)
     
    -

    - - +

    Annotating a custom data type with a default coder
    + +

    If your pipeline program defines a custom data type, you can use the @DefaultCoder annotation to specify the coder to use with that type. For example, let’s say you have a custom data type for which you want to use SerializableCoder. You can use the @DefaultCoder annotation as follows:

    + +
    @DefaultCoder(AvroCoder.class)
    +public class MyCustomDataType {
    +  ...
    +}
    +
    +
    + +

    If you’ve created a custom coder to match your data type, and you want to use the @DefaultCoder annotation, your coder class must implement a static Coder.of(Class<T>) factory method.

    + +
    public class MyCustomCoder implements Coder {
    +  public static Coder<T> of(Class<T> clazz) {...}
    +  ...
    +}
    +
    +@DefaultCoder(MyCustomCoder.class)
    +public class MyCustomDataType {
    +  ...
    +}
    +
    +
    + +

    The Beam SDK for Python does not support annotating data types with a default coder. If you would like to set a default coder, use the method described in the previous section, Setting the default coder for a type.

    + +