beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/3] incubator-beam-site git commit: Update documentation/pipelines/design-your-pipeline.md. Add 5 images for the doc.
Date Wed, 23 Nov 2016 06:12:10 GMT
Repository: incubator-beam-site
Updated Branches:
  refs/heads/asf-site 51e0afeb0 -> 946ed0010


Update documentation/pipelines/design-your-pipeline.md. Add 5 images for the doc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/5b3bda65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/5b3bda65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/5b3bda65

Branch: refs/heads/asf-site
Commit: 5b3bda6590361c2a7f8923d46780dbc72499440b
Parents: 51e0afe
Author: Hadar Hod <hadarh@google.com>
Authored: Mon Nov 7 15:58:49 2016 -0800
Committer: Davor Bonaci <davor@google.com>
Committed: Tue Nov 22 22:10:51 2016 -0800

----------------------------------------------------------------------
 .../pipelines/create-your-pipeline.md           | 155 ++++++++++-
 .../pipelines/design-your-pipeline.md           | 104 +++++++-
 .../pipelines/test-your-pipeline.md             | 266 ++++++++++++++++++-
 src/images/design-your-pipeline-flatten.png     | Bin 0 -> 47858 bytes
 src/images/design-your-pipeline-join.png        | Bin 0 -> 41878 bytes
 src/images/design-your-pipeline-linear.png      | Bin 0 -> 15218 bytes
 ...sign-your-pipeline-multiple-pcollections.png | Bin 0 -> 39095 bytes
 .../design-your-pipeline-side-outputs.png       | Bin 0 -> 36451 bytes
 8 files changed, 522 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/documentation/pipelines/create-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/create-your-pipeline.md b/src/documentation/pipelines/create-your-pipeline.md
index b99628f..05b1594 100644
--- a/src/documentation/pipelines/create-your-pipeline.md
+++ b/src/documentation/pipelines/create-your-pipeline.md
@@ -5,4 +5,157 @@ permalink: /documentation/pipelines/create-your-pipeline/
 ---
 # Create Your Pipeline
 
-> **Note:** There is an open JIRA issue to create this guide ([BEAM-901](https://issues.apache.org/jira/browse/BEAM-901)).
+* TOC
+{:toc}
+
+Your Beam program expresses a data processing pipeline, from start to finish. This section
explains the mechanics of using the classes in the Beam SDKs to build a pipeline. To construct
a pipeline using the classes in the Beam SDKs, your program will need to perform the following
general steps:
+
+*   Create a `Pipeline` object.
+*   Use a **Read** or **Create** transform to create one or more `PCollection`s for your
pipeline data.
+*   Apply **transforms** to each `PCollection`. Transforms can change, filter, group, analyze,
or otherwise process the elements in a `PCollection`. Each transform creates a new output
`PCollection`, to which you can apply additional transforms until processing is complete.
+*   **Write** or otherwise output the final, transformed `PCollection`s.
+*   **Run** the pipeline.
+
+## Creating Your Pipeline Object
+
+A Beam program often starts by creating a `Pipeline` object.
+
+In the Beam SDKs, each pipeline is represented by an explicit object of type `Pipeline`.
Each `Pipeline` object is an independent entity that encapsulates both the data the pipeline
operates over and the transforms that get applied to that data.
+
+To create a pipeline, declare a `Pipeline` object, and pass it some configuration options,
which are explained in a section below. You pass the configuration options by creating an
object of type `PipelineOptions`, which you can build by using the static method `PipelineOptionsFactory.create()`.
+
+```java
+// Start by defining the options for the pipeline.
+PipelineOptions options = PipelineOptionsFactory.create();
+
+// Then create the pipeline.
+Pipeline p = Pipeline.create(options);
+```
+
+### Configuring Pipeline Options
+
+Use the pipeline options to configure different aspects of your pipeline, such as the pipeline
runner that will execute your pipeline and any runner-specific configuration required by the
chosen runner. Your pipeline options will potentially include information such as your project
ID or a location for storing files. 
+
+When you run the pipeline on a runner of your choice, a copy of the PipelineOptions will
be available to your code. For example, you can read PipelineOptions from a DoFn's Context.
+
+#### Setting PipelineOptions from Command-Line Arguments
+
+While you can configure your pipeline by creating a `PipelineOptions` object and setting
the fields directly, the Beam SDKs include a command-line parser that you can use to set fields
in `PipelineOptions` using command-line arguments.
+
+To read options from the command-line, construct your `PipelineOptions` object as demonstrated
in the following example code:
+
+```java
+MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
+```
+
+This interprets command-line arguments that follow the format:
+
+```java
+--<option>=<value>
+```
+
+> **Note:** Appending the method `.withValidation` will check for required command-line
arguments and validate argument values.
+
+Building your `PipelineOptions` this way lets you specify any of the options as a command-line
argument.
+
+> **Note:** The [WordCount example pipeline]({{ site.baseurl }}/get-started/wordcount-example)
demonstrates how to set pipeline options at runtime by using command-line options.
+
+#### Creating Custom Options
+
+You can add your own custom options in addition to the standard `PipelineOptions`. To add
your own options, define an interface with getter and setter methods for each option, as in
the following example:
+
+```java
+public interface MyOptions extends PipelineOptions {
+    String getMyCustomOption();
+    void setMyCustomOption(String myCustomOption);
+  }
+```
+
+You can also specify a description, which appears when a user passes `--help` as a command-line
argument, and a default value.
+
+You set the description and default value using annotations, as follows:
+
+```java
+public interface MyOptions extends PipelineOptions {
+    @Description("My custom command line argument.")
+    @Default.String("DEFAULT")
+    String getMyCustomOption();
+    void setMyCustomOption(String myCustomOption);
+  }
+```
+
+It's recommended that you register your interface with `PipelineOptionsFactory` and then
pass the interface when creating the `PipelineOptions` object. When you register your interface
with `PipelineOptionsFactory`, the `--help` can find your custom options interface and add
it to the output of the `--help` command. `PipelineOptionsFactory` will also validate that
your custom options are compatible with all other registered options.
+
+The following example code shows how to register your custom options interface with `PipelineOptionsFactory`:
+
+```java
+PipelineOptionsFactory.register(MyOptions.class);
+MyOptions options = PipelineOptionsFactory.fromArgs(args)
+                                                .withValidation()
+                                                .as(MyOptions.class);
+```
+
+Now your pipeline can accept `--myCustomOption=value` as a command-line argument.
+
+## Reading Data Into Your Pipeline
+
+To create your pipeline's initial `PCollection`, you apply a root transform to your pipeline
object. A root transform creates a `PCollection` from either an external data source or some
local data you specify.
+
+There are two kinds of root transforms in the Beam SDKs: `Read` and `Create`. `Read` transforms
read data from an external source, such as a text file or a database table. `Create` transforms
create a `PCollection` from an in-memory `java.util.Collection`.
+
+The following example code shows how to `apply` a `TextIO.Read` root transform to read data
from a text file. The transform is applied to a `Pipeline` object `p`, and returns a pipeline
data set in the form of a `PCollection<String>`:
+
+```java
+PCollection<String> lines = p.apply(
+  apply("ReadLines", TextIO.Read.from("gs://some/inputData.txt"));
+```
+
+## Applying Transforms to Process Pipeline Data
+
+To use transforms in your pipeline, you **apply** them to the `PCollection` that you want
to transform.
+
+To apply a transform, you call the `apply` method on each `PCollection` that you want to
process, passing the desired transform object as an argument.
+
+The Beam SDKs contain a number of different transforms that you can apply to your pipeline's
`PCollection`s. These include general-purpose core transforms, such as [ParDo]({{ site.baseurl
}}/documentation/programming-guide/#transforms-pardo) or [Combine]({{ site.baseurl }}/documentation/programming-guide/#transforms-combine).
There are also pre-written [composite transforms]({{ site.baseurl }}/documentation/programming-guide/#transforms-composite)
included in the SDKs, which combine one or more of the core transforms in a useful processing
pattern, such as counting or combining elements in a collection. You can also define your
own more complex composite transforms to fit your pipeline's exact use case.
+
+In the Beam Java SDK, each transform is a subclass of the base class `PTransform`. When you
call `apply` on a `PCollection`, you pass the `PTransform` you want to use as an argument.
+
+The following code shows how to `apply` a transform to a `PCollection` of strings. The transform
is a user-defined custom transform that reverses the contents of each string and outputs a
new `PCollection` containing the reversed strings.
+
+The input is a `PCollection<String>` called `words`; the code passes an instance of
a `PTransform` object called `ReverseWords` to `apply`, and saves the return value as the
`PCollection<String>` called `reversedWords`.
+
+```java
+PCollection<String> words = ...;
+
+PCollection<String> reversedWords = words.apply(new ReverseWords());
+```
+
+## Writing or Outputting Your Final Pipeline Data
+
+Once your pipeline has applied all of its transforms, you'll usually need to output the results.
To output your pipeline's final `PCollection`s, you apply a `Write` transform to that `PCollection`.
`Write` transforms can output the elements of a `PCollection` to an external data sink, such
as a database table. You can use `Write` to output a `PCollection` at any time in your pipeline,
although you'll typically write out data at the end of your pipeline.
+
+The following example code shows how to `apply` a `TextIO.Write` transform to write a `PCollection`
of `String` to a text file:
+
+```java
+PCollection<String> filteredWords = ...;
+
+filteredWords.apply("WriteMyFile", TextIO.Write.to("gs://some/outputData.txt"));
+```
+
+## Running Your Pipeline
+
+Once you have constructed your pipeline, use the `run` method to execute the pipeline. Pipelines
are executed asynchronously: the program you create sends a specification for your pipeline
to a **pipeline runner**, which then constructs and runs the actual series of pipeline operations.

+
+```java
+p.run();
+```
+
+The `run` method is asynchronous. If you'd like a blocking execution instead, run your pipeline
appending the `waitUntilFinish` method:
+
+```java
+p.run().waitUntilFinish();
+```
+
+## What's next
+
+*   [Test your pipeline]({{ site.baseurl }}/documentation/pipelines/test-your-pipeline).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/documentation/pipelines/design-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md
index 8a76704..937b35c 100644
--- a/src/documentation/pipelines/design-your-pipeline.md
+++ b/src/documentation/pipelines/design-your-pipeline.md
@@ -5,4 +5,106 @@ permalink: /documentation/pipelines/design-your-pipeline/
 ---
 # Design Your Pipeline
 
-> **Note:** There is an open JIRA issue to create this guide ([BEAM-901](https://issues.apache.org/jira/browse/BEAM-901)).
+* TOC
+{:toc}
+
+This page helps you design your Apache Beam pipeline. It includes information about how to
determine your pipeline's structure, how to choose which transforms to apply to your data,
and how to determine your input and output methods.
+
+Before reading this section, it is recommended that you become familiar with the information
in the [Beam programming guide]({{ site.baseurl }}/documentation/programming-guide).
+
+## What to consider when designing your pipeline
+
+When designing your Beam pipeline, consider a few basic questions:
+
+*   **Where is your input data stored?** How many sets of input data do you have? This will
determine what kinds of `Read` transforms you'll need to apply at the start of your pipeline.
+*   **What does your data look like?** It might be plaintext, formatted log files, or rows
in a database table. Some Beam transforms work exclusively on `PCollection`s of key/value
pairs; you'll need to determine if and how your data is keyed and how to best represent that
in your pipeline's `PCollection`(s).
+*   **What do you want to do with your data?** The core transforms in the Beam SDKs are general
purpose. Knowing how you need to change or manipulate your data will determine how you build
core transforms like [ParDo]({{ site.baseurl }}/documentation/programming-guide/#transforms-pardo),
or when you use pre-written transforms included with the Beam SDKs.
+*   **What does your output data look like, and where should it go?** This will determine
what kinds of `Write` transforms you'll need to apply at the end of your pipeline.
+
+## A basic pipeline
+
+The simplest pipelines represent a linear flow of operations, as shown in Figure 1 below:
+
+<figure id="fig1">
+    <img src="{{ site.baseurl }}/images/design-your-pipeline-linear.png"
+         alt="A linear pipeline.">
+</figure>
+Figure 1: A linear pipeline.
+
+However, your pipeline can be significantly more complex. A pipeline represents a [Directed
Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have
multiple input sources, multiple output sinks, and its operations (transforms) can output
multiple `PCollection`s. The following examples show some of the different shapes your pipeline
can take.
+
+## Branching PCollections
+
+It's important to understand that transforms do not consume `PCollection`s; instead, they
consider each individual element of a `PCollection` and create a new `PCollection` as output.
This way, you can do different things to different elements in the same `PCollection`.
+
+### Multiple transforms process the same PCollection
+
+You can use the same `PCollection` as input for multiple transforms without consuming the
input or altering it.
+
+The pipeline illustrated in Figure 2 below reads its input, first names (Strings), from a
single source, a database table, and creates a `PCollection` of table rows. Then, the pipeline
applies multiple transforms to the **same** `PCollection`. Transform A extracts all the names
in that `PCollection` that start with the letter 'A', and Transform B extracts all the names
in that `PCollection` that start with the letter 'B'. Both transforms A and B have the same
input `PCollection`.
+
+<figure id="fig2">
+    <img src="{{ site.baseurl }}/images/design-your-pipeline-multiple-pcollections.png"
+         alt="A pipeline with multiple transforms. Note that the PCollection of table rows
is processed by two transforms.">
+</figure>
+Figure 2: A pipeline with multiple transforms. Note that the PCollection of the database
table rows is processed by two transforms.
+
+### A single transform that uses side outputs
+
+Another way to branch a pipeline is to have a **single** transform output to multiple `PCollection`s
by using [side outputs]({{ site.baseurl }}/documentation/programming-guide/#transforms-sideio).
Transforms that use side outputs, process each element of the input once, and allow you to
output to zero or more `PCollection`s.
+
+Figure 3 below illustrates the same example described above, but with one transform that
uses a side output; Names that start with 'A' are added to the output `PCollection`, and names
that start with 'B' are added to the side output `PCollection`.
+
+<figure id="fig3">
+    <img src="{{ site.baseurl }}/images/design-your-pipeline-side-outputs.png"
+         alt="A pipeline with a transform that outputs multiple PCollections.">
+</figure>
+Figure 3: A pipeline with a transform that outputs multiple PCollections.
+
+The pipeline in Figure 2 contains two transforms that process the elements in the same input
`PCollection`. One transform uses the following logic pattern:
+
+<pre>if (starts with 'A') { outputToPCollectionA }</pre>
+
+while the other transform uses:
+
+<pre>if (starts with 'B') { outputToPCollectionB }</pre>
+
+Because each transform reads the entire input `PCollection`, each element in the input `PCollection`
is processed twice.
+
+The pipeline in Figure 3 performs the same operation in a different way - with only one transform
that uses the logic
+
+<pre>if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB
}</pre>
+
+where each element in the input `PCollection` is processed once.
+
+You can use either mechanism to produce multiple output `PCollection`s. However, using side
outputs makes more sense if the transform's computation per element is time-consuming.
+
+## Merging PCollections
+
+Often, after you've branched your `PCollection` into multiple `PCollection`s via multiple
transforms, you'll want to merge some or all of those resulting `PCollection`s back together.
You can do so by using one of the following:
+
+*   **Flatten** - You can use the `Flatten` transform in the Beam SDKs to merge multiple
`PCollection`s of the **same type**.
+*   **Join** - You can use the `CoGroupByKey` transform in the Beam SDK to perform a relational
join between two `PCollection`s. The `PCollection`s must be keyed (i.e. they must be collections
of key/value pairs) and they must use the same key type.
+
+The example depicted in Figure 4 below is a continuation of the example illustrated in Figure
2 in the section above. After branching into two `PCollection`s, one with names that begin
with 'A' and one with names that begin with 'B', the pipeline merges the two together into
a single `PCollection` that now contains all names that begin with either 'A' or 'B'. Here,
it makes sense to use `Flatten` because the `PCollection`s being merged both contain the same
type.
+
+<figure id="fig4">
+    <img src="{{ site.baseurl }}/images/design-your-pipeline-flatten.png"
+         alt="Part of a pipeline that merges multiple PCollections.">
+</figure>
+Figure 4: Part of a pipeline that merges multiple PCollections.
+
+## Multiple sources
+
+Your pipeline can read its input from one or more sources. If your pipeline reads from multiple
sources and the data from those sources is related, it can be useful to join the inputs together.
In the example illustrated in Figure 5 below, the pipeline reads names and addresses from
a database table, and names and order numbers from a text file. The pipeline then uses `CoGroupByKey`
to join this information, where the key is the name; the resulting `PCollection` contains
all the combinations of names, addresses, and orders.
+
+<figure id="fig5">
+    <img src="{{ site.baseurl }}/images/design-your-pipeline-join.png"
+         alt="A pipeline with multiple input sources.">
+</figure>
+Figure 5: A pipeline with multiple input sources.
+
+## What's next
+
+*   [Create your own pipeline]({{ site.baseurl }}/documentation/pipelines/create-your-pipeline).
+*   [Test your pipeline]({{ site.baseurl }}/documentation/pipelines/test-your-pipeline).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/documentation/pipelines/test-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/test-your-pipeline.md b/src/documentation/pipelines/test-your-pipeline.md
index 0a75bb5..247c52c 100644
--- a/src/documentation/pipelines/test-your-pipeline.md
+++ b/src/documentation/pipelines/test-your-pipeline.md
@@ -5,4 +5,268 @@ permalink: /documentation/pipelines/test-your-pipeline/
 ---
 # Test Your Pipeline
 
-> **Note:** There is an open JIRA issue to create this guide ([BEAM-901](https://issues.apache.org/jira/browse/BEAM-901)).
+* TOC
+{:toc}
+
+Testing your pipeline is a particularly important step in developing an effective data processing
solution. The indirect nature of the Beam model, in which your user code constructs a pipeline
graph to be executed remotely, can make debugging-failed runs a non-trivial task. Often it
is faster and simpler to perform local unit testing on your pipeline code than to debug a
pipeline's remote execution.
+
+Before running your pipeline on the runner of your choice, unit testing your pipeline code
locally is often the best way to identify and fix bugs in your pipeline code. Unit testing
your pipeline locally also allows you to use your familiar/favorite local debugging tools.
+
+You can use [DirectRunner]({{ site.baseurl }}/documentation/runners/direct), a local runner
helpful for testing and local development.
+
+After you test your pipeline using the `DirectRunner`, you can use the runner of your choice
to test on a small scale. For example, use the Flink runner with a local or remote Flink cluster.

+
+
+
+
+
+
+The Beam SDKs provide a number of ways to unit test your pipeline code, from the lowest to
the highest levels. From the lowest to the highest level, these are:
+
+*   You can test the individual function objects, such as [DoFn]({{ site.baseurl }}/documentation/programming-guide/#transforms-pardo)s,
inside your pipeline's core transforms.
+*   You can test an entire [Composite Transform]({{ site.baseurl }}/documentation/programming-guide/#transforms-composite)
as a unit.
+*   You can perform an end-to-end test for an entire pipeline.
+
+To support unit testing, the Beam SDK for Java provides a number of test classes in the [testing
package](https://github.com/apache/incubator-beam/tree/master/sdks/java/core/src/test/java/org/apache/beam/sdk).
You can use these tests as references and guides.
+
+## Testing Individual DoFn Objects
+
+The code in your pipeline's `DoFn` functions runs often, and often across multiple Compute
Engine instances. Unit-testing your `DoFn` objects before running them using a runner service
can save a great deal of debugging time and energy.
+
+The Beam SDK for Java provides a convenient way to test an individual `DoFn` called [DoFnTester](https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java),
which is included in the SDK `Transforms` package.
+
+`DoFnTester`uses the [JUnit](http://junit.org) framework. To use `DoFnTester`, you'll need
to do the following:
+
+1.  Create a `DoFnTester`. You'll need to pass an instance of the `DoFn` you want to test
to the static factory method for `DoFnTester`.
+2.  Create one or more main test inputs of the appropriate type for your `DoFn`. If your
`DoFn` takes side inputs and/or produces side outputs, you should also create the side inputs
and the side output tags.
+3.  Call `DoFnTester.processBundle` to process the main inputs.
+4.  Use JUnit's `Assert.assertThat` method to ensure the test outputs returned from `processBatch`
match your expected values.
+
+### Creating a DoFnTester
+
+To create a `DoFnTester`, first create an instance of the `DoFn` you want to test. You then
use that instance when you create a `DoFnTester` using the `.of()` static factory method:
+
+```java 
+static class MyDoFn extends DoFn<String, Integer> { ... }
+  MyDoFn myDoFn = ...;
+
+  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);
+```
+
+### Creating Test Inputs
+
+You'll need to create one or more test inputs for `DoFnTester` to send to your `DoFn`. To
create test inputs, simply create one or more input variables of the same input type that
your `DoFn` accepts. In the case above:
+
+```java
+static class MyDoFn extends DoFn<String, Integer> { ... }
+MyDoFn myDoFn = ...;
+DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);
+
+String testInput = "test1";
+```
+
+#### Side Inputs and Outputs
+
+If your `DoFn` accepts side inputs, you can create those side inputs by using the method
`DoFnTester.setSideInputs`.
+
+```java
+static class MyDoFn extends DoFn<String, Integer> { ... }
+MyDoFn myDoFn = ...;
+DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);
+
+PCollectionView<List<Integer>> sideInput = ...;
+Iterable<Integer> value = ...;
+fnTester.setSideInputInGlobalWindow(sideInput, value);
+```
+
+If your `DoFn` produces side outputs, you'll need to set the appropriate `TupleTag` objects
that you'll use to access each output. A `DoFn` with side outputs produces a `PCollectionTuple`
for each side output; you'll need to provide a `TupleTagList` that corresponds to each side
output in that tuple.
+
+Suppose your `DoFn` produces side outputs of type `String` and `Integer`. You create `TupleTag`
objects for each, and bundle them into a `TupleTagList`, then set it for the `DoFnTester`
as follows:
+
+```java  
+static class MyDoFn extends DoFn<String, Integer> { ... }
+MyDoFn myDoFn = ...;
+DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);
+
+TupleTag<String> tag1 = ...;
+TupleTag<Integer> tag2 = ...;
+TupleTagList tags = TupleTagList.of(tag1).and(tag2);
+
+fnTester.setSideOutputTags(tags);
+```
+
+See the `ParDo` documentation on [side inputs]({{ site.baseurl }}/documentation/programming-guide/#transforms-sideio)
for more information.
+
+### Processing Test Inputs and Checking Results
+
+To process the inputs (and thus run the test on your `DoFn`), you call the method `DoFnTester.processBatch`.
When you call `processBatch`, you pass one or more main test input values for your `DoFn`.
If you set side inputs, the side inputs are available to each batch of main inputs that you
provide.
+
+`DoFnTester.processBatch` returns a `List` of outputs—that is, objects of the same type
as the `DoFn`'s specified output type. For a `DoFn<String, Integer>`, `processBatch`
returns a `List<Integer>`:
+
+```java  
+static class MyDoFn extends DoFn<String, Integer> { ... }
+MyDoFn myDoFn = ...;
+DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);
+
+String testInput = "test1";
+List<Integer> testOutputs = fnTester.processBatch(testInput);
+```
+
+To check the results of `processBatch`, you use JUnit's `Assert.assertThat` method to test
if the `List` of outputs contains the values you expect:
+
+```java  
+String testInput = "test1";
+List<Integer> testOutputs = fnTester.processBatch(testInput);
+
+Assert.assertThat(testOutputs, Matchers.hasItems(...));
+
+// Process a larger batch in a single step.
+Assert.assertThat(fnTester.processBatch("input1", "input2", "input3"), Matchers.hasItems(...));
+```
+
+## Testing Composite Transforms
+
+To test a composite transform you've created, you can use the following pattern:
+
+*   Create a `TestPipeline`.
+*   Create some static, known test input data.
+*   Use the `Create` transform to create a `PCollection` of your input data.
+*   `Apply` your composite transform to the input `PCollection` and save the resulting output
`PCollection`.
+*   Use `PAssert` and its subclasses to verify that the output `PCollection` contains the
elements that you expect.
+
+### TestPipeline
+
+[TestPipeline](https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java)
is a class included in the Beam Java SDK specifically for testing transforms. For tests, use
`TestPipeline` in place of `Pipeline` when you create the pipeline object. Unlike `Pipeline.create`,
`TestPipeline.create` handles setting `PipelineOptions` interally.
+
+You create a `TestPipeline` as follows:
+
+```java
+Pipeline p = TestPipeline.create();
+```
+
+> **Note:** Read about testing unbounded pipelines in Beam in [this blog post]({{ site.baseurl
}}/blog/2016/10/20/test-stream.html).
+
+### Using the Create Transform
+
+You can use the `Create` transform to create a `PCollection` out of a standard in-memory
collection class, such as Java `List`. See [Creating a PCollection]({{ site.baseurl }}/documentation/programming-guide/#pcollection)
for more information.
+
+### PAssert
+ 
+[PAssert]({{ site.baseurl }}/documentation/sdks/javadoc/0.3.0-incubating/org/apache/beam/sdk/testing/PAssert.html)
is a class included in the Beam Java SDK  that is an assertion on the contents of a `PCollection`.
You can use `PAssert`to verify that a `PCollection` contains a specific set of expected elements.
+
+For a given `PCollection`, you can use `PAssert` to verify the contents as follows:
+
+```java  
+PCollection<String> output = ...;
+
+// Check whether a PCollection contains some elements in any order.
+PAssert.that(output)
+.containsInAnyOrder(
+  "elem1",
+  "elem3",
+  "elem2");
+```
+
+Any code that uses `PAssert` must link in `JUnit` and `Hamcrest`. If you're using Maven,
you can link in `Hamcrest` by adding the following dependency to your project's `pom.xml`
file:
+
+```java 
+<dependency>
+    <groupId>org.hamcrest</groupId>
+    <artifactId>hamcrest-all</artifactId>
+    <version>1.3</version>
+    <scope>test</scope>
+</dependency>
+```
+
+For more information on how these classes work, see the [org.apache.beam.sdk.testing](http://beam.incubator.apache.org/documentation/sdks/javadoc/0.3.0-incubating/org/apache/beam/sdk/testing/package-summary.html)
package documentation.
+
+### An Example Test for a Composite Transform
+
+The following code shows a complete test for a composite transform. The test applies the
`Count` transform to an input `PCollection` of `String` elements. The test uses the `Create`
transform to create the input `PCollection` from a Java `List<String>`.
+
+```java  
+public class CountTest {
+
+// Our static input data, which will make up the initial PCollection.
+static final String[] WORDS_ARRAY = new String[] {
+"hi", "there", "hi", "hi", "sue", "bob",
+"hi", "sue", "", "", "ZOW", "bob", ""};
+
+static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+
+public void testCount() {
+  // Create a test pipeline.
+  Pipeline p = TestPipeline.create();
+
+  // Create an input PCollection.
+  PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+
+  // Apply the Count transform under test.
+  PCollection<KV<String, Long>> output =
+    input.apply(Count.<String>perElement());
+
+  // Assert on the results.
+  PAssert.that(output)
+    .containsInAnyOrder(
+        KV.of("hi", 4L),
+        KV.of("there", 1L),
+        KV.of("sue", 2L),
+        KV.of("bob", 2L),
+        KV.of("", 3L),
+        KV.of("ZOW", 1L));
+
+  // Run the pipeline.
+  p.run();
+}
+```
+
+## Testing a Pipeline End-to-End
+
+You can use the test classes in the Beam SDKs (such as `TestPipeline` and `PAssert` in the
Beam SDK for Java) to test an entire pipeline end-to-end. Typically, to test an entire pipeline,
you do the following:
+
+*   For every source of input data to your pipeline, create some known static test input
data.
+*   Create some static test output data that matches what you expect in your pipeline's final
output `PCollection`(s).
+*   Create a `TestPipeline` in place of the standard `Pipeline.create`.
+*   In place of your pipeline's `Read` transform(s), use the `Create` transform to create
one or more `PCollection`s from your static input data.
+*   Apply your pipeline's transforms.
+*   In place of your pipeline's `Write` transform(s), use `PAssert` to verify that the contents
of the final `PCollection`s your pipeline produces match the expected values in your static
output data.
+
+### Testing the WordCount Pipeline
+
+The following example code shows how one might test the [WordCount example pipeline]({{ site.baseurl
}}/get-started/wordcount-example/). `WordCount` usually reads lines from a text file for input
data; instead, the test creates a Java `List<String>` containing some text lines and
uses a `Create` transform to create an initial `PCollection`.
+
+`WordCount`'s final transform (from the composite transform `CountWords`) produces a `PCollection<String>`
of formatted word counts suitable for printing. Rather than write that `PCollection` to an
output text file, our test pipeline uses `PAssert` to verify that the elements of the `PCollection`
match those of a static `String` array containing our expected output data.
+
+```java
+public class WordCountTest {
+
+    // Our static input data, which will comprise the initial PCollection.
+    static final String[] WORDS_ARRAY = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+
+    // Our static output data, which is the expected data that the final PCollection must
match.
+    static final String[] COUNTS_ARRAY = new String[] {
+        "hi: 5", "there: 1", "sue: 2", "bob: 2"};
+
+    // Example test that tests the pipeline's transforms.
+
+    public void testCountWords() throws Exception {
+      Pipeline p = TestPipeline.create();
+
+      // Create a PCollection from the WORDS static input data.
+      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+
+      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
+      PCollection<String> output = input.apply(new CountWords());
+
+      // Assert that the output PCollection matches the COUNTS_ARRAY known static output
data.
+      PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);
+
+      // Run the pipeline.
+      p.run();
+    }
+}
+```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/images/design-your-pipeline-flatten.png
----------------------------------------------------------------------
diff --git a/src/images/design-your-pipeline-flatten.png b/src/images/design-your-pipeline-flatten.png
new file mode 100644
index 0000000..d07f7e5
Binary files /dev/null and b/src/images/design-your-pipeline-flatten.png differ

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/images/design-your-pipeline-join.png
----------------------------------------------------------------------
diff --git a/src/images/design-your-pipeline-join.png b/src/images/design-your-pipeline-join.png
new file mode 100644
index 0000000..b7ccb9f
Binary files /dev/null and b/src/images/design-your-pipeline-join.png differ

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/images/design-your-pipeline-linear.png
----------------------------------------------------------------------
diff --git a/src/images/design-your-pipeline-linear.png b/src/images/design-your-pipeline-linear.png
new file mode 100644
index 0000000..a021fe7
Binary files /dev/null and b/src/images/design-your-pipeline-linear.png differ

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/images/design-your-pipeline-multiple-pcollections.png
----------------------------------------------------------------------
diff --git a/src/images/design-your-pipeline-multiple-pcollections.png b/src/images/design-your-pipeline-multiple-pcollections.png
new file mode 100644
index 0000000..7eb802b
Binary files /dev/null and b/src/images/design-your-pipeline-multiple-pcollections.png differ

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/5b3bda65/src/images/design-your-pipeline-side-outputs.png
----------------------------------------------------------------------
diff --git a/src/images/design-your-pipeline-side-outputs.png b/src/images/design-your-pipeline-side-outputs.png
new file mode 100644
index 0000000..f13989d
Binary files /dev/null and b/src/images/design-your-pipeline-side-outputs.png differ


Mime
View raw message